Skip to content

Commit

Permalink
chore: add trace logs to mss and remove commented perf code
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain committed Nov 17, 2023
1 parent 6625a27 commit adea7bb
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 62 deletions.
4 changes: 2 additions & 2 deletions packages/multistream-select/src/handle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ export async function handle <Stream extends Duplex<any, any, any>> (stream: Str
}

if (protocols.includes(protocol)) {
await multistream.write(lp, uint8ArrayFromString(`${protocol}\n`), options)
options.log.trace('respond with "%s" for "%s"', protocol, protocol)
await multistream.write(lp, uint8ArrayFromString(`${protocol}\n`), options)

return { stream: lp.unwrap(), protocol }
}
Expand All @@ -89,7 +89,7 @@ export async function handle <Stream extends Duplex<any, any, any>> (stream: Str
continue
}

await multistream.write(lp, uint8ArrayFromString('na\n'), options)
options.log('respond with "na" for "%s"', protocol)
await multistream.write(lp, uint8ArrayFromString('na\n'), options)
}
}
3 changes: 3 additions & 0 deletions packages/multistream-select/src/select.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,13 @@ export async function select <Stream extends Duplex<any, any, any>> (stream: Str
const p2 = uint8ArrayFromString(`${protocol}\n`)
await multistream.writeAll(lp, [p1, p2], options)

options?.log.trace('select: reading multistream-select header')
let response = await multistream.readString(lp, options)
options?.log.trace('select: read "%s"', response)

// Read the protocol response if we got the protocolId in return
if (response === PROTOCOL_ID) {
options?.log.trace('select: reading protocol response')
response = await multistream.readString(lp, options)
options?.log.trace('select: read "%s"', response)
}
Expand All @@ -84,6 +86,7 @@ export async function select <Stream extends Duplex<any, any, any>> (stream: Str
for (const protocol of protocols) {
options?.log.trace('select: write "%s"', protocol)
await multistream.write(lp, uint8ArrayFromString(`${protocol}\n`), options)
options?.log.trace('select: reading protocol response')
const response = await multistream.readString(lp, options)
options?.log.trace('select: read "%s" for "%s"', response, protocol)

Expand Down
72 changes: 12 additions & 60 deletions packages/protocol-perf/src/perf-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,23 @@ export class Perf implements Startable, PerfInterface {
const writeBlockSize = this.writeBlockSize

const initialStartTime = Date.now()
let lastReportedTime = Date.now()
const connection = await this.components.connectionManager.openConnection(ma, {
...options,
force: options.reuseExistingConnection !== true
})

this.log('opened connection after %d ms', Date.now() - lastReportedTime)
lastReportedTime = Date.now()

const stream = await connection.newStream(this.protocol, options)

this.log('opened stream after %d ms', Date.now() - lastReportedTime)
lastReportedTime = Date.now()

let lastAmountOfBytesSent = 0
let lastReportedTime = Date.now()
let totalBytesSent = 0
const uploadStart = Date.now()

// tell the remote how many bytes we will send. Up cast to 64 bit number
// as if we send as ui32 we limit total transfer size to 4GB
Expand Down Expand Up @@ -160,71 +167,14 @@ export class Perf implements Startable, PerfInterface {

yield * output

/*
const b = byteStream(stream)
await b.write(uint8Buf.subarray(0, 8), options)
while (sendBytes > 0) {
let toSend: number = writeBlockSize
if (toSend > sendBytes) {
toSend = sendBytes
}
const chunk = uint8Buf.subarray(0, toSend)
await b.write(chunk, options)
sendBytes -= toSend
if (Date.now() - lastReportedTime > 1000) {
yield {
type: 'intermediary',
timeSeconds: (Date.now() - lastReportedTime) / 1000,
uploadBytes: lastAmountOfBytesSent,
downloadBytes: 0
}
// record last reported time after `console.log` because it can
// affect benchmark timings
lastReportedTime = Date.now()
lastAmountOfBytesSent = 0
}
lastAmountOfBytesSent += toSend
totalBytesSent += toSend
}
// sent all the bytes, close the write end of the stream
await b.unwrap().closeWrite()
this.log('upload complete after %d ms', Date.now() - uploadStart)

*/
// Read the received bytes
let lastAmountOfBytesReceived = 0
lastReportedTime = Date.now()
let totalBytesReceived = 0
/*
while (totalBytesReceived < receiveBytes) {
const buf = await b.read(1024, options)
if (Date.now() - lastReportedTime > 1000) {
yield {
type: 'intermediary',
timeSeconds: (Date.now() - lastReportedTime) / 1000,
uploadBytes: 0,
downloadBytes: lastAmountOfBytesReceived
}
const downloadStart = Date.now()

// record last reported time after `console.log` because it can
// affect benchmark timings
lastReportedTime = Date.now()
lastAmountOfBytesReceived = 0
}
lastAmountOfBytesReceived += buf.byteLength
totalBytesReceived += buf.byteLength
}
*/
for await (const buf of stream.source) {
if (Date.now() - lastReportedTime > 1000) {
yield {
Expand All @@ -244,6 +194,8 @@ export class Perf implements Startable, PerfInterface {
totalBytesReceived += buf.byteLength
}

this.log('download complete after %d ms', Date.now() - downloadStart)

if (totalBytesReceived !== receiveBytes) {
throw new Error(`Expected to receive ${receiveBytes} bytes, but received ${totalBytesReceived}`)
}
Expand Down

0 comments on commit adea7bb

Please sign in to comment.