-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat/pull mplex #4
Conversation
@libp2p/javascript-team is this still of any interest to the libp2p team, if not can I publish this as a standalone module? |
@dryajov it is! We were talking about this last week. I'd like to try and get this finished up in the next few weeks in relation to libp2p/js-libp2p-mplex#89 |
Awesome! I'm here to assist with whatever I can. |
@dryajov just a heads up, I'm working on finalizing this this week. I hit some errors (oddly when updating just aegir) that I'm working through. I also think it might be beneficial to have this implement the |
need to figure out the best way to handle the error and if we should be emitting the errors
// is handling errors. In node 10+ if we dont catch this | ||
// error, it will be thrown. Perhaps channel shouldnt be | ||
// emitting errors? | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was needed to fix an error with the test suite, because there was no error listener on channel. I need to do some more testing, but I don't think the channel needs to be emitting errors. It appears to be handling everything internally and reseting the stream properly, so I think it's probably redundant anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the Channel should be emitting errors, but those needs to be catched here and logged. A lot of channel erroring might be a smell than it's better to tear down the connection?
refactor: update some code for clarity
refactor: clean up code for clarity
Dependencies are updated and I went ahead and added jsdocs and some comments across the project as I was reviewing things. Aside from the channel object pooling, that was already mentioned, I think this looks good. My thoughts on moving this forward are:
The reason I'd like to have this as a standalone from libp2p-mplex, is that in the future, we might indeed change that over to stream3/async iterators. However, that's a lot of work and is going to happen soon. I'd like to keep the benchmarks added in libp2p/js-libp2p-mplex#76, so that we can test the pull-stream implementation against stream3 before doing a full move in that direction. Can I get some feedback on this approach before moving that direction? @dryajov @alanshaw @vasco-santos @mkg20001 @daviddias ❤️ |
👍 on overall plan, implementing interface-stream-muxer also makes sense. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be good to have some quick benchmarks built with benchmark.js or fastbench to check if everything is working correctly. Also, we should check a) flamegraphs and b) bubblegraphs if there is any areas we could improve. These should also serve as a comparison between the two implementations. You might grab some of the code in https://github.com/mcollina/tentacoli/tree/master/benchmarks as an example.
this.sendMsg(data) | ||
next() | ||
}) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The above creates a lot of closures. I'd recommend to refactor this logic into a top level or instance function to ease V8 optimizer job.
* Closes the channel with the given error | ||
* @param {Error} err Default: `'channel reset!'` | ||
*/ | ||
reset (err) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is often called destroy()
in Node lingo. Should this have a callback?
* | ||
* @param {Buffer} data | ||
*/ | ||
sendMsg (data) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this have a callback?
this.queue(pool.slice(oldUsed, used)) // send header | ||
|
||
if (PULL_LENGTH - used < 100) { | ||
pool = Buffer.alloc(PULL_LENGTH) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be bold and go for allocUnsafe
. We are writing 100% of the memory, minus bugs.
// Reading is done for this message, start processing it | ||
if (States.PARSING === state) { | ||
if (accumulating) { | ||
used += msg.copy(buffer, used) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to accumulate the buffer in an array and just concat them all. All this copying will come at a cost.
// is handling errors. In node 10+ if we dont catch this | ||
// error, it will be thrown. Perhaps channel shouldnt be | ||
// emitting errors? | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the Channel should be emitting errors, but those needs to be catched here and logged. A lot of channel erroring might be a smell than it's better to tear down the connection?
// Create a new stream | ||
case Types.NEW: { | ||
const chan = this._newStream(id, false, true, data.toString(), this._inChannels) | ||
setImmediate(() => this.emit('stream', chan, id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why setImmediate
and not nextTick
?
} | ||
|
||
default: | ||
this.emit('error', new Error('Invalid message type')) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this might need to be deferred with nextTick
.
(Benchmarks could also come in a separate PR). |
Thanks for the feedback @mcollina. I made a couple small changes (nextTick and error logging). I am going to merge this and work on the performance and callback items you noted in a new PR that will include benchmarks and the |
UPDATE: This is now ready for review.
TODO:
Bellow are the accompanying PRs:
Perf:
I was able to shave off ~15 mins of the new implementations running the mega stress tests, the perf is now about the same (or a little better ;) ) than the stream based implementation.
Archive.zip
The attached zip contains a heap snapshot and a perf log taken with
node --prof --nologfile_per_isolate --logfile=xxxxxx.log --log-timer-events /private/var/folders/_r/6c6jf6m10kb3v9kt45qspw4w0000gn/T/v8profilerProxy.js
. The one thing that still irks me a bit is the excessive (compared to the prev implementation) GC time. I think we can improve that a lot by pooling theChannel
objects and reusing them, rather than creating new ones every time.Here is a screenshot from the perf log graph - GC is seems to take ~13% of the total time. (These graphs where generated using the attached logs, and webstorm perf and heap analyzers).
Keep having issues with this PR, use this instead of #1, #2, #3.