Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Define how "push" interacts with highWaterMark on readable side (suspending of push) #12

Closed
lightsofapollo opened this issue Oct 30, 2013 · 4 comments

Comments

@lightsofapollo
Copy link

Node's internal use of highWaterMark is mostly in conjunction with the ability to call an explicit method to read with an explicit number number of bytes triggered by a request for data.

To have sane piping I believe we need the ability to halt the flow of data on the readable side (if the writer is blocked we can attempt to pause the reader). This is where having some enforced convention seems useful:

My theoretical example of how this might work:

const stream = new ReadableStream({ highWaterMark: 1024 }, (push, done, error) => {
  var timerId;  
  function spam() {
    var defer = push(str2ab("why can't you come up with a better example"));

    // lets say your a good person
    if (defer) return promise.then(spam);

    timerId = setTimeout(spam);
  }

  spam();
});

In the case where the promise is ignored (we should also set a flag?) then report it somehow?


@domenic
Copy link
Member

domenic commented Oct 30, 2013

The existing API already provides for this (although I admit I don't have an example of using it yet). If push returns false, you should pause the underlying source of data until onRead is called again. In other words, it uses onRead being called as the signaling mechanism, instead of a new out-of-band promise. This gives much more flexibility to the system to schedule calls to onRead when appropriate, perhaps multiple times, instead of only the zero or one times that a promise allows. In essence, this is the difference between pull streams and push streams: you are creating a push stream, which pushes data at the consumer and opts-in to information about how it is being consumed; the existing API is for a pull stream, where the primary mode of interaction is through notification that you are being pulled from.

In other words, I do not understand what this proposal offers over the API in the readme. I also don't understand if you are abandoning the idea of onRead being called multiple times, or if you're giving it a different meaning, or what.

@lightsofapollo
Copy link
Author

Re-reading the readme I am mistaken about the constructor... As I understand it from reading it again the function passed to the constructor toRead is invoked for each call to .read.

After reading the various long threads about promises the first thing I think is this thing will only be called once.


To be clear then toRead functions much the same way _read and doing something like this would work?

// (but with ES6 and extending ReadableStream)
function MyReader() {
  ReadableStream.call(this);
}

MyReader.prototype.onRead = function(push, done, error) {}

Assuming the above is true why do we need the function on the constructor?

@domenic
Copy link
Member

domenic commented Oct 30, 2013

onRead is not a method of streams; it is a function you pass to a stream constructor to tell it how it that stream reacts to read calls. A sample implementation might be something like:

class ReadableStream {
  constructor(options, onRead) {
    this._options = options;
    this._onRead = onRead;
    this._internalBuffer = new ArrayBuffer(options.highWaterMark);
  }

  read() {
    this._onRead(data => {
      pushInto(this._internalBuffer, data);
      return !hasRoomLeft(this._internalBuffer, this._options);
    }, ...);
    process.nextTick(() => this._internalBuffer = new ArrayBuffer(options.highWaterMark);

    if (hasData(this._internalBuffer)) {
      return Promise.resolve(this._internalBuffer);
    }

    return waitForData(this._internalBuffer).then(() => this._internalBuffer);
  }
}

@domenic
Copy link
Member

domenic commented Nov 4, 2013

The new BaseReadableStream makes it somewhat clearer how this works, especially the onRead-type interaction we were discussing. But I will leave this open until I finish writing ReadableStream and showing how a configurable backpressure strategy with a given high water mark works.

domenic added a commit that referenced this issue Nov 18, 2013
This ties them to more concrete scenarios of a raw socket and raw file handle API. In doing so, we move the specific API surfaces over into these examples, fixing #42. We also make explicit how the backpressure strategy will work, closing #12.
@domenic domenic closed this as completed Nov 18, 2013
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants