Skip to content

Commit

Permalink
Add WritableStream with customizable backpressure strategy.
Browse files Browse the repository at this point in the history
  • Loading branch information
domenic committed Nov 29, 2013
1 parent 261598a commit 7339d45
Showing 1 changed file with 68 additions and 12 deletions.
80 changes: 68 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ In extensible web fashion, we will build up to a fully-featured streams from a f
#### Readable Streams

- `BaseReadableStream`
- Has a very simple backpressure strategy, communicating to the underlying data source that it should stop supplying data immediately after it pushes some onto the stream's underlying buffer. (In other words, it has a "high water mark" of zero.)
- Has a very simple backpressure strategy, communicating to the underlying data source that it should stop supplying data immediately after it pushes some onto the stream's underlying buffer. In other words, it has a high water mark of zero.
- Support piping to only one destination.
- `ReadableStream`
- A higher-level API used by most creators of readable streams.
Expand All @@ -62,7 +62,7 @@ In extensible web fashion, we will build up to a fully-featured streams from a f
#### WritableStreams

- `BaseWritableStream`
- Has a very simple backpressure strategy, communicating that it is "full" immediately after any data is written (but becoming ready to write again after the asynchronous write completes).
- Has a very simple backpressure strategy, communicating that it is "full" immediately after any data is written (but becoming ready to write again after the asynchronous write completes). In other words, it has a high water mark of zero.
- `WritableStream`
- A higher-level API used by most creators of writable streams.
- Adds the ability to customize the buffering and backpressure strategy, overriding the basic one.
Expand Down Expand Up @@ -602,7 +602,7 @@ class BaseWritableStream {
[[error]](any e)
[[doClose]]()
[[doDispose]](r)
[[doNextWrite]]()
[[doNextWrite]]({ type, promise, data })
// Internal properties
Array [[buffer]] = []
Expand Down Expand Up @@ -648,7 +648,8 @@ In reaction to calls to the stream's `.write()` method, the `write` constructor
1. Set `this.[[writableState]]` to `"writable"`.
1. Resolve `this.[[writablePromise]]` with `undefined`.
1. Otherwise,
1. Call `this.[[doNextWrite]]()`.
1. Shift `entry` off of `this.[[buffer]]`.
1. Call `this.[[doNextWrite]](entry)`.
1. When/if `startedPromise` is rejected with reason `r`, call `this.[[error]](r)`.

##### get closed
Expand All @@ -663,10 +664,9 @@ In reaction to calls to the stream's `.write()` method, the `write` constructor

1. Let `promise` be a newly-created pending promise.
1. If `this.[[writableState]]` is `"writable"`,
1. Push `{ type: "data", promise, data }` onto `this.[[buffer]]`.
1. Set `this.[[writableState]]` to `"waiting"`.
1. Set `this.[[writablePromise]]` to be a newly-created pending promise.
1. Call `this.[[doNextWrite]]()`.
1. Call `this.[[doNextWrite]]({ type: "data", promise, data })`.
1. Return `promise`.
1. If `this.[[writableState]]` is `"waiting"`,
1. Push `{ type: "data", promise, data }` onto `this.[[buffer]]`.
Expand Down Expand Up @@ -745,11 +745,8 @@ In reaction to calls to the stream's `.write()` method, the `write` constructor
1. When/if `disposeResult` is rejected with reason `r`, call `this.[[error]](r)`.
1. Return `this.[[closedPromise]]`.

##### `[[doNextWrite]]()`
##### `[[doNextWrite]]({ type, promise, data })`

1. Assert: `this.[[buffer]]` is not empty.
1. Assert: `this.[[writableState]]` is `"waiting"` or `"closing"`.
1. Shift `{ type, promise, data }` off of `this.[[buffer]]`.
1. If `type` is `"close"`,
1. Assert: `this.[[writableState]]` is `"closing"`.
1. Call `this.[[doClose]]()`.
Expand All @@ -761,18 +758,77 @@ In reaction to calls to the stream's `.write()` method, the `write` constructor
1. Set `this.[[currentWritePromise]]` to `undefined`.
1. If `this.[[writableState]]` is `"waiting"`,
1. Resolve `promise` with `undefined`.
1. If `this.[[buffer]]` is not empty, call `this.[[doNextWrite]]()`.
1. If `this.[[buffer]]` is not empty,
1. Shift `entry` off of `this.[[buffer]]`.
1. Call `this.[[doNextWrite]](entry)`.
1. If `this.[[buffer]]` is empty,
1. Set `this.[[writableState]]` to `"writable"`.
1. Resolve `this.[[writablePromise]]` with `undefined`.
1. If `this.[[writableState]]` is `"closing"`,
1. Resolve `promise` with `undefined`.
1. If `this.[[buffer]]` is not empty, call `this.[[doNextWrite]]()`.
1. If `this.[[buffer]]` is not empty,
1. Shift `entry` off of `this.[[buffer]]`.
1. Call `this.[[doNextWrite]](entry)`.
1. Call `this.[[onWrite]](data, signalDone, [[error]])`.
1. If the call throws an exception `e`, call `this.[[error]](e)`.

Note: if the constructor's `write` option calls `done` more than once, or after calling `error`, or after the stream has been disposed, then `signalDone` ends up doing nothing.

### WritableStream

```js
class WritableStream extends BaseWritableStream {
// Adds a backpressure strategy argument.
constructor({
function start = () => {},
function write = () => {},
function close = () => {},
function dispose = close,
strategy: { function count, function needsMoreData }
})

// Overriden to take into account backpressure strategy
write(data)

// Overriden to take into account backpressure strategy.
// You can also think of this as part of the the constructor and write override.
[[doNextWrite]]({ type, promise, data })

// Internal properties
[[strategy]]
}
```
#### Properties of the WritableStream Prototype
##### constructor({ start, write, close, dispose, strategy })
1. Set `this.[[strategy]]` to `strategy`.
1. Call `super({ start, write, close, dispose })`.
##### write(data)
1. If `this.[[writableState]]` is `"writable"` or `"waiting"`,
1. Add `this.[[strategy]].count(data)` to `this.[[bufferSize]]`.
1. If `this.[[writableState]]` is `"writable"`,
1. Let `promise` be a newly-created pending promise.
1. If `ToBoolean(this.[[strategy]].needsMoreData(this.[[bufferSize]]))` is `false`,
1. Set `this.[[writableState]]` to `"waiting"`.
1. Set `this.[[writablePromise]]` to be a newly-created pending promise.
1. If `this.[[buffer]]` is empty,
1. Call `this.[[doNextWrite]]({ type: "data", promise, data })`.
1. Otherwise,
1. Push `{ type: "data", promise, data }` onto `this.[[buffer]]`.
1. Return `promise`.
1. Return `super(data)`.
#### Internal Methods of WritableStream
##### `[[doNextWrite]]({ type, promise, data })`
1. Subtract `this.[[strategy]].count(data)` from `this.[[bufferSize]]`.
1. Return the result of calling `BaseWritableStream`'s version of `this.[[doNextWrite]]({ type, promise, data })`.
## Helper APIs
### TeeStream
Expand Down

0 comments on commit 7339d45

Please sign in to comment.