Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

streams2 #1681

Closed
wants to merge 9 commits into from
Closed

streams2 #1681

wants to merge 9 commits into from

Conversation

mikeal
Copy link

@mikeal mikeal commented Sep 10, 2011

  • Make stream.Stream a read/write stream with proxy methods by default.
  • Make readable streams in lib use stream.ReadStream.
  • Add stream.createFilter.

… Make readable streams in lib use stream.ReadStream. Add stream.createFilter.
@mikeal
Copy link
Author

mikeal commented Sep 10, 2011

this isn't ready to merge yet. too many tests in master are broken right now for me to be confident that this change doesn't also break something. once there are less failures in master we can look at merging this.

for now, we should have a discussion about the API. it also needs a good test for the createFilter API.

@mikeal
Copy link
Author

mikeal commented Sep 10, 2011

now that i think about it, the read streams in lib are actually bindings to a lower level interface that writes to them, they should probably still inherit from stream.Stream so that we can support buffering.

@felixge
Copy link

felixge commented Sep 15, 2011

Is this in good shape for review now?

@mikeal
Copy link
Author

mikeal commented Sep 16, 2011

i don't want to attempt to merge it until the tests are passing in master, but we should start picking apart the code that is already in there now.

i also want to remove the multi source support so that we can add error propagation.

@TooTallNate
Copy link

FWIW, my stream-stack module implements this same kind of "proxy by default" behavior (so I'm glad to see that it was a good idea after all :p).

The point of that module was to write Stream "middle-men", that make up your protocol stack; so this new createFilter API is probably a similar concept there as well, but with a different API.

In any case, I'm just interested to see where this goes. I'll have some technical comments shortly.

@mikeal
Copy link
Author

mikeal commented Sep 22, 2011

TODO

  • remove multi-reader logic from pipe()
  • add error propogation

TODO proposals

  • remove close
  • insure all readstreams in core emit('end'), even after an error
  • add destroy() ( @isaacs proposal )
  • provide a way to unhook a pipe() that does not propagate ( calling end() will propogate )

@dominictarr
Copy link

"always emit 'end',
and that being what breaks up the chain: +1

I'm on the fence about proxy the buffering feature. what about putting that in a separate stream, so you pipe through the buffer, and only using it when you want that feature?

createFilter: I see a few problems here, the filter will emit 'end' immediately after it receives it, no matter what the user's cb function is doing. and cb looks like a callback, but doesn't take an error parameter.

I think that this: https://github.com/dominictarr/event-stream/blob/master/index.js#L116-177
Is a more useful stream helper function. it takes an ordinary async function, and wraps it in a stream.
The user can pause the stream by returning false, and when 'end' occurs, the stream will not propagate it downstream until all the currently active async function's have called back.

@mjackson
Copy link

A few notes about this code:

  • Calling pause should automatically trigger buffering, imo. Otherwise you will continue to get data events while paused.
  • In addition to destroy there should also be a destroySoon(callback) API for when there is still buffered data to emit.
  • When writing data using end you should be careful to make sure it is actually emitted before emitting the end event.

@isaacs
Copy link

isaacs commented Sep 24, 2011

I've been discussing this stuff a bunch with @mikeal and @ry. What we need is a spec that is complete enough to cover all of node-core's use cases (particularly: tcp, tls/ssl, http, zlib/crypto filters, file streams), and simple enough to be understandable and extendable by userland programs.

IMO, after windows support, this is the most important bit of low-hanging fruit available to us.

One of the biggest problems with this api is that we have far too many ways to say "done", and there's very little consistency. destroySoon is particularly egregious, and must be destroyed as soon as possible.

@mikeal
Copy link
Author

mikeal commented Sep 25, 2011

One general comment before i reply to people individually.

We need to seperate the concerns between Streams in core and Streams that will be created by userland programs. It's unlikely that a userland program will write a new stream around a file handler, it's an edge case at best.

I believe that we can simplify the API greatly for node developers if we remove from the public spec the finer points that only really relate to streams we're implementing.

@mikeal
Copy link
Author

mikeal commented Sep 25, 2011

@dominictarr

I implemented a BufferedStream object, it's available here.

The problem with the BufferedStream approach is this.

function (req, resp) {
  var b = new BufferedStream()
  req.pipe(b)
  talkToRedis(function (e, result) {
    b.pipe(request(result.url + req.url)).pipe(resp)
  })
}

Looks fine, but it's not.The BufferedStream breaks capability detection, all of a sudden request is unaware that it's input in an http request. The only thing you could do would be to use a Proxy or lots of getters to proxy the attribute acces which would be really painful, fairly slow, and confuse the shit out of everyone.

You have a good point about createFilter but it's not intended to handle every use case. It's still going to be faster to write a new stream object rather than using createFilter. The idea is to handle 90% of the use cases with a very simple API and defer other use cases to creating a new full Stream object. An alternative API that relies on return values or continuation passing is easily buildable in userland but not as simple as createFilter.

@mjijackson

I had the same conversation with @felixge about using pause() for buffering. Here's the thing. We need to separate the notion of internal data buffers and messaging the client to stop sending data (which incurs a roundtrip to get it to start sending data again).

Currently, pause() means "tell the client to pause". I think we should leave it that way. We need another way to say "hold on to data events and don't tell the client to stop sending yet". If we say that pause() calls buffer() then we're in another bad place where there are two methods that do something similar but not the same (see end(), close(), destroy(), destroySoon()), the confusion caused by this insures that people will mostly call the wrong thing.

Also, it's better to not buffer data events when write() returns false (the remote end cannot accept data). The pause() call is going to take RTT to get the client to stop sending data. If we keep pushing that data through the pipe() chain we'll have it in the most efficient place possible, right next to the file descriptor that asked us to stop sending data. In the future we can optimize this to push that data before we emit("drain") and get out of unnecessary roundtrips to the client when working with very slow writers.

destrySoon might be a good idea for a few core streams but not for any userland streams so I don't want it in the spec. What we decide to do for files and sockets should be seperated from what we want userland streams to look like. Also, the "callback on write" is a total lie. We don't actually know when data is fsync'd do disc. I find it a better policy to not make API available where it is actually an outright lie :)

end(chunk) should NOT emit a "data" event. it should this.write(chunk) before emitting end :)

@dominictarr
Copy link

I whole heatedly agree with the small core, big userland principle.

the one thing that I really need my userland streams to do is be able to handle errors if necessary.

I am using BufferedStream!
though, I don't understand why the upstream needs to know what the downstream is?
I thought it only needs to know what rate it should send?

In many of the things that I want to make userland streams for, bleeding edge performance is not quite as important the fact that the stream API is scalable, if you are using the same API for jobs as you do for IO, breaking out part of your application into another process, or another server becomes pretty trivial. I'm confidant that you guys are doing a great job getting core IO streams great, but that is what I am interested in, userland.

@isaacs
Copy link

isaacs commented Sep 25, 2011

@mikeal,

I believe that we can simplify the API greatly for node developers if
we remove from the public spec the finer points that only really
relate to streams we're implementing.

While I agree with you in principle here, and we definitely need to get to a minimal contract that is easy to understand, I absolutely disagree that these "finer points" only apply to streams implemented in core.

Anything that goes through any kind of binding layer will have a lot of the same concerns, and will almost certainly have to handle cleanup. So there's your destroy/close/whatever use case. In fact, file streams are one of the simpler types of streams to clean up, since they're strictly uni-directional and have a very clear way to shut them down by calling close(fd).

The difference between a duplex stream and a filter stream is only a gentleman's agreement that ending the input will end the output in some reasonable amount of time. We absolutely need some way to express "I'm ending the input, and forcibly destroying anything and everything about this stream, right now."

If you have cleanup, and you have forcible shutdown, then you have everything that's needed to support node-core streams, and it turns out those things are also necessary in many userland cases as well.

@isaacs
Copy link

isaacs commented Sep 26, 2011

Here's a spec that I think can work for node-core and is clear enough to be used for userland programs: https://gist.github.com/1241393

I don't think that .buffer(n) method is out of the question, but we really need to organize the current functionality before adding anything new.

What do you all think?

@dominictarr
Copy link

I approve,

except for dest.on('error', e) ----> src.on('error', e)

maybe dest.on('error', e) ----> src.error(e)

or if(!dest.handleErrors) dest.on('error', e) ----> src.error(e)

it needs to be possible to handle errors, for Filter and Duplex streams,
the head of the stream may or may not be the right place to do that.

@mikeal
Copy link
Author

mikeal commented Sep 26, 2011

@isaacs i think we can add buffer as we clean up the current functionality, it's already written :) i agree it's a good idea in general though to clean up what we have and stay away from too many additional extensions, I just have this rather large issue right now and have already gone through the trouble of fixing it :)

@dominictarr i'm mulling over your error comment.... it's a very new API but it is definitely more consistent with how we currently deal with reverse event propogation.

@isaacs the reason I make this a branch and a pull request is to keep the conversation about something a little more concrete. we've endlessly discussed the stream spec for a year and have made relatively little progress. I'm going to merge in your new Stream spec and replace the Stream spec in the docs in my branch so that it can become part of the conversation, and i'll probably start implementing part of it as well.

@dominictarr i owe you, and everyone really, a blog post about capability detection in stream pipes so that you call can understand what the fuck i'm talking about.

@mikeal
Copy link
Author

mikeal commented Sep 26, 2011

@isaacs

what do you think about removing the distinction in the spec between a readable and a writable stream?

it's really confusing for people implementing readable/writable streams. we have these direct method to event correlations that we can't actually outline in the spec because we're separated the two objects.

@mikeal
Copy link
Author

mikeal commented Sep 26, 2011

@isaacs

what do you think about removing all the callback arguments, for now.

do we have a really strong userland case for them at the moment? it's a lot of extra code, and a fairly large new feature. if the primary goal is to clean up what is there I think we should tackle this in a future release.

@isaacs
Copy link

isaacs commented Sep 26, 2011

we've endlessly discussed the stream spec for a year and have made relatively little progress.

That's because we were discussing it, and not writing anything down, and then writing a bunch of code based on our meatware records of the discussion, and then dealing with bugs in said code, and then repeating.

There are enough people now with enough experience dealing with these things that we should be able to understand the issues. The best way to build an interoperable thing is for people with that understanding to sit down and write out a spec in english, and then adjust it when the code doesn't fit it. There are too many edge cases in real-life streams to do it any other way.

what do you think about removing the distinction in the spec between a readable and a writable stream?

Well, it's pretty significant. If a stream is readable, then data comes out of it, and if it's writable, then data goes into it. Knowing which set of capabilities it has is important and helpful.

what do you think about removing all the callback arguments, for now.

A write() callback is absolutely a necessity in core. However, I'd be ok with removing it from the spec. Stream.pipe doesn't rely on it.

Same for the callbacks to end() and flush(). Basically, end() is this:

write(chunk, function () {
  this.ended = true;
  if (cb) cb()
})

and flush() is:

write(cb)

If you can construct the write() function to make the chunk arg optional, then you get flush for free. But, then it gives you a place for implementation specific logic. Zlib would set the flush argument to Z_SYNC_FLUSH, an fs.WriteStream could call fsync or whatever, etc.

for most streams, what does this mean? it doesn't mean fsync because we really don't know. do we do it after the relevant event is emitted? if that's the meaning then we should call it out clearly in the docs.

You need to stop focusing on filters exclusively.

Remember, for a duplex stream, there is not necessarily any event that corresponds to a write() call. In fact, even for some filters, this is the case. A deflate filter might get several megabytes written to it, in tiny chunks, and then emit a single data event. A sha filter will take an unlimited number of write() calls, and only emit a single 40-byte data chunk when you call end() on it.

The write cb simply means, "Whatever it is that write means for this particular stream, it's done now. The data was written to disk, or flushed from the system's IO buffer, or handed off to zlib, or encrypted with the cipher stream, or added to the sha hash computation, or turned into a chicken, or whatever."

[allowHalfDuplex] is this relevant for anything other than net and tls?

It's relevant for Socket.io's userland streams, or anything that dnode talks over. I don't think this will be a success unless it can support both filters, one-way streams, and duplexes. We need to stop asking whether something is "only" needed for net and tls, and instead ask whether something is needed for duplexes, filters, or one-way streams. If the answer is "yes", then it's needed for Streams, period.

But, "allowHalfDuplex" is a wonky networking term that people might not get. Maybe destroyOnEnd or something? I dunno.

Basically, the semantic here is: "For this two-way stream, should it be forcibly destroyed once the last bit of data is finished being written (which requires a write() cb, btw), or should it stay open to continue getting data events."

[end must be emitted] I agree with this, but we currently allow userland to break this with the {end:false} argument in pipe().

Right. We just make it so that the pipe itself doesn't call .end() on the writer when the reader emits end, but readable streams still need to emit end, or else pipe actually won't work, since it won't know when to clean up the listeners.

Here's an example that should probably be a section in the "Using streams" part:

// frame the file in a header and footer
header.pipe(output, { end: false })
header.on("end", function () {
  body.pipe(output, {end: false})
})
body.on("end", function () {
  footer.pipe(output)
})

You could also build something like this, where a tcp socket connection gets multiple files piped to it as the user requests them:

net.createServer(function (sock) {
  var buf = ""
  var files = []
  var sending = false
  sock.on("data", function (c) {
    buf += c
    buf = buf.replace(/\n+/, "\n")
    if (buf.indexOf("\n") != -1) sendFiles()
  })
  function sendFiles () {
    var f = buf.split("\n")
    buf = f.pop()
    files.push.apply(files, f)
    sendFile()
  }
  function sendFile () {
    if (sending) return
    var file = files.shift()
    if (!file) {
      sending = false
      return
    }
    var s = fs.createReadStream(file)
    s.pipe(sock, {end: false})
    s.on("end", sendFile)
  }
})

If the read streams didn't emit "end", then that sort of thing wouldn't be possible.

destroy() should not emit("end"), at least in userland streams.

Then that breaks the guarantee that read streams are guaranteed to emit "end" eventually.

If that is the case A knows it's a duplex stream and that when a circular error occurs it's the right place to handle throw logic.

How? It's a synchronous call in pipe(). Are you suggesting that A override emit() to not actually emit the error event if it's already in the middle of emitting an error event? With error propagation, it's pretty easy to encounter this.

var EventEmitter = require("events").EventEmitter
x = new EventEmitter
x.on("error", function (e) {
  x.emit("error", e)
  throw e
})
x.emit("error", new Error("Not a RangeError"))

It seems to me that this is actually something that would have to be handled by the EventEmitter class, or at least the Stream base class. We could also maybe get around it by using @dominictarr's suggestion of replacing the emit(error) --> emit(error) with an error(e) method that can be made idempotent. But that really doesn't feel elegant at all. What we need is some way to stop propagation through the pipe chain once the error is handled, and there's no way to do that right now.

Here's a non-exhaustive list of constraints that I've got so far on this API:

  1. readable streams (ie, anything that emits "data") must emit "end" when it is known that no more "data" is going to be emitted, no matter the reason, and then not emit "data" ever again.
  2. This has to work properly, and be guaranteed to never throw range errors: net.createServer(function (s) { s.pipe(s) })
  3. There must be a way to forcibly destroy a duplex stream, so that data may be lost.
  4. write() and end() must never throw, except in the case where write() is called after end(), in which case it MUST throw (not emit("error"), because that is an error in your program, not in the underlying stream).

So, the question is, what is the minimum change to the API that gets us to those constraints?

@ry
Copy link

ry commented Sep 26, 2011

we've endlessly discussed the stream spec for a year and have made relatively little progress.

That's because we were discussing it, and not writing anything down, and then writing a bunch of code based on our meatware records of the discussion, and then dealing with bugs in said code, and then repeating.

Or rather the interface is simply too complex and no one feels good about setting it in stone.

@mjackson
Copy link

I agree with @ry that the interface is very complex. The approach that @isaacs is taking is a sound one. We'll need to run the spec by a few different use cases to make sure it is complete though.

I've done quite a bit of work on Strata's BufferedStream implementation this weekend and today. For the purposes of Strata (a web framework) it works very well. I have already implemented subclasses for both gzip and jsonp encoding which are very small and which help prove the usefulness of the approach I'm taking.

@mikeal
Copy link
Author

mikeal commented Oct 6, 2011

Ok, this is where I think we're at with the encoding issues.

write(chunk, encoding) can actually break utf8.

treating setEncoding as a mutation on streams that just call buffer.toString() also breaks utf8 in some cases.

it seems like the best thing to do is remove encoding as a param to write, stick it on readable streams, and propagate the call through the pipe chain with a "setEncoding" event in filter streams..

@mikeal
Copy link
Author

mikeal commented Oct 6, 2011

@TooTallNate

streams will mutate data, that is expected. requiring the mutation to only encode in to buffers and to always hold the right number of bytes to decode properly isn't acceptable for authors of those streams.

@dominictarr
Copy link

yeah, I'm with mikeal on this one, a stream needs to know what the upstream thought the encoding is, but it is possible that a filter stream may change that, so if there is encoding metadata, ohh... now I've done it.

what about stream metadata?

http headers also seem like stream metadata, or am I crazy?

@isaacs
Copy link

isaacs commented Oct 6, 2011

Encoding needs to be handled on a case-by-case basis. We should expose a filter interface that can safely decode buffers to strings according to an encoding.

Encoding needs to be apart from the Stream interface.

@polotek
Copy link

polotek commented Oct 8, 2011

Perhaps we should start working on some concrete test cases. "buffer.toString() breaks utf-8". Seems pretty simple to mock up. "streams need to know about upstream encoding". Probably. What does that look like?

I think you guys are one the right track with an open spec + a developing implementation. Running code that exercises some of these points of discussion would complete the trifecta, no? I'll try to help out soon.

@koichik
Copy link

koichik commented Oct 18, 2011

How about string encoder/decoder as a filter?

https://gist.github.com/1295138

example:

fs.createReadStream('input.txt.gz')
    .pipe(zlib.createGunzip())
    .pipe(utf8Decoder()) // buffer to string
    .pipe(new RegExpFilter(/node.js/i, 'node'))
    .pipe(utf8Encoder()) // string to buffer
    .pipe(zlib.createGzip())
    .pipe(fs.createWriteStream('output.txt.gz'));

@dominictarr
Copy link

+1

@felixge
Copy link

felixge commented Oct 19, 2011

+1 A filter stream seems very sensible here.

@mikeal
Copy link
Author

mikeal commented Oct 19, 2011

I think we already agreed that setEncoding needs to stay in.

In fact, i think we have agreement that setEncoding stays in and the encoding argument to write goes away.

Doing everything with filters is problematic the same way buffering with filters is problematic. It's too common of a case and is likely to be needed as a feature on all core streams so it'll break capability detection.

@mikeal
Copy link
Author

mikeal commented Oct 19, 2011

Ok, time to trim this down to something we might merge before 0.6.

@isaacs what are you thoughts?

I would love to remove close() before this release but I doubt that we have time to fix all the core streams. I would like the new passthrough Stream, createFilter and .buffer() to go in and they are implemented, although we do need tests.

Also, do we have time to insure that write() and end() throw on error in time? I'd really like to get that in.

@felixge
Copy link

felixge commented Oct 19, 2011

I think we already agreed that setEncoding needs to stay in.

Fine with me, but I'd still love StreamDecoder to become public and act as a stream filter, it would be a nice API.

@isaacs
Copy link

isaacs commented Oct 20, 2011

As cute as it would be to use filters to decode/encode buffers to strings (and it wouldn't be hard to do using require("string_decoder")) I think we need to keep the current api relatively stable wrt the encoding arguments.

Close needs to remain in, at least for TCP streams, otherwise there's no way to say "end my side, but as soon as that's done, I'm going to destroy it, so don't bother calling the shutdown syscall".

Also, we need to work out the issue with cyclical streams in order for error propagation to not be troublesome.

At the very least, even if we make no changes to any of the core streams apis, we need to fix it so that http/https streams can never ever throw on write() or end(), because that noise is just plain annoying.

@mikeal
Copy link
Author

mikeal commented Oct 20, 2011

On Oct 19, 2011, at 8:26 PM, "Isaac Z. Schlueter"reply@reply.github.com wrote:

As cute as it would be to use filters to decode/encode buffers to strings (and it wouldn't be hard to do using require("string_decoder")) I think we need to keep the current api relatively stable wrt the encoding arguments.

Close needs to remain in, at least for TCP streams, otherwise there's no way to say "end my side, but as soon as that's done, I'm going to destroy it, so don't bother calling the shutdown syscall".

Also, we need to work out the issue with cyclical streams in order for error propagation to not be troublesome.

At the very least, even if we make no changes to any of the core streams apis, we need to fix it so that http/https streams can never ever throw on write() or end(), because that noise is just plain annoying.

never throw?

I thought we wanted the opposite, to throw we the stream was already ended?

Reply to this email directly or view it on GitHub:
#1681 (comment)

@koichik
Copy link

koichik commented Oct 20, 2011

At the very least, even if we make no changes to any of the core streams apis,

Currently tls does not (cannot) emit 'end' event (#1795, #728), we have to fix it too.
I think that a part of the cause is that tls does not support allowHalfOpen=true. https.Server needs it.

@isaacs
Copy link

isaacs commented Oct 20, 2011

@mikeal oh, sorry, yes, it should throw if you .write() after you .end(), but this thing where it just throws randomly if the underlying socket dies, that sucks a lot.

@mikeal
Copy link
Author

mikeal commented Oct 20, 2011

@isaacs if the socket is closed, because it's dead, don't we want it to throw if you attempt to write to it?

@seebees
Copy link

seebees commented Nov 9, 2011

If the Stream throws on .write() (because the underlying socket dies) then you need to have the logic handeling this unfortunate event in the .write() method. I don't think that is the correct place to force an implementer to deal with a broken socket or any other kind of run time problem. Better to buffer the write, return false and emit 'error'

The case of a .write() after .end() is more likely a design time problem so throwing seem to me to be the correct thing to do.

Just my $.02. But with inflation these days...

@mojodna
Copy link

mojodna commented Dec 2, 2011

Netty's approach to encoding and decoding is possibly worth looking at. For example, transparent encoding of input to strings merely involves adding a StringEncoder to the pipeline:

pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));

Similarly, decoding a stream uses a StringDecoder:

pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));

Clarifying and enhancing the stream spec would also make it easier to build protocol encoders and decoders while shielding app code from their complexity.

ChannelPipeline appears to be the place to look for more details.

@koichik
Copy link

koichik commented Feb 7, 2012

Related issues:

@JayBeavers
Copy link

Added a comment to the gist design doc:

bool write(chunk, callback=null)

The base class write method emits a data event with the provided chunk, calls the callback function if supplied, and returns true.

If called after end(), then MUST throw. Otherwise, MUST NOT throw.

There are types of streams, primarily communications like sockets or serialports, where an error condition such as lost connection or reset remote devices is not detected until the write is attempted. In this case, imho write should throw and then be followed by an end.

@JayBeavers
Copy link

In reading through the streams2 spec, I don't see any guidance on how a stream's ctor or open event should work. Some stream types, such as node-serialport and socket connections have an async open and shouldn't be written or read until the open has completed. How do you feel this should fit into the stream2 spec? See issue #60 in node-serialport for related discussion.

@dominictarr
Copy link

you can pretty much write every to every writable stream as soon as it's created.
it would be better to buffer the events, or to create the stream with a async function.

@mikeal mikeal closed this Jul 29, 2012
@mikeal
Copy link
Author

mikeal commented Jul 29, 2012

we're going down a very different road for 0.8 streams. closing this out.

@reconbot
Copy link

Where's the discussion on that?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.