Skip to content
This repository has been archived by the owner on Aug 11, 2020. It is now read-only.

Commit

Permalink
src: enable StreamPipe for generic StreamBases
Browse files Browse the repository at this point in the history
  • Loading branch information
addaleax committed Oct 3, 2019
1 parent a529ed5 commit eb88db6
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 17 deletions.
1 change: 0 additions & 1 deletion lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,6 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
this[kLastWriteQueueSize] = req.bytes;
};


Socket.prototype._writev = function(chunks, cb) {
this._writeGeneric(true, chunks, '', cb);
};
Expand Down
56 changes: 43 additions & 13 deletions src/stream_pipe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ StreamPipe::StreamPipe(StreamBase* source,
source->PushStreamListener(&readable_listener_);
sink->PushStreamListener(&writable_listener_);

CHECK(sink->HasWantsWrite());
uses_wants_write_ = sink->HasWantsWrite();

// Set up links between this object and the source/sink objects.
// In particular, this makes sure that they are garbage collected as a group,
Expand Down Expand Up @@ -66,7 +66,8 @@ void StreamPipe::Unpipe() {
is_closed_ = true;
is_reading_ = false;
source()->RemoveStreamListener(&readable_listener_);
sink()->RemoveStreamListener(&writable_listener_);
if (pending_writes_ == 0)
sink()->RemoveStreamListener(&writable_listener_);

// Delay the JS-facing part with SetImmediate, because this might be from
// inside the garbage collector, so we can’t run JS here.
Expand Down Expand Up @@ -124,13 +125,16 @@ void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
// EOF or error; stop reading and pass the error to the previous listener
// (which might end up in JS).
pipe->is_eof_ = true;
// Cache `sink()` here because the previous listener might do things
// that eventually lead to an `Unpipe()` call.
StreamBase* sink = pipe->sink();
stream()->ReadStop();
CHECK_NOT_NULL(previous_listener_);
previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
// If we’re not writing, close now. Otherwise, we’ll do that in
// `OnStreamAfterWrite()`.
if (!pipe->is_writing_) {
pipe->ShutdownWritable();
if (pipe->pending_writes_ == 0) {
sink->Shutdown();
pipe->Unpipe();
}
return;
Expand All @@ -140,30 +144,38 @@ void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
}

void StreamPipe::ProcessData(size_t nread, AllocatedBuffer&& buf) {
CHECK(uses_wants_write_ || pending_writes_ == 0);
uv_buf_t buffer = uv_buf_init(buf.data(), nread);
StreamWriteResult res = sink()->Write(&buffer, 1);
pending_writes_++;
if (!res.async) {
writable_listener_.OnStreamAfterWrite(nullptr, res.err);
} else {
is_writing_ = true;
is_reading_ = false;
res.wrap->SetAllocatedStorage(std::move(buf));
if (source() != nullptr)
source()->ReadStop();
}
}

void StreamPipe::ShutdownWritable() {
sink()->Shutdown();
}

void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
int status) {
StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
pipe->is_writing_ = false;
pipe->pending_writes_--;
if (pipe->is_closed_) {
if (pipe->pending_writes_ == 0) {
Environment* env = pipe->env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
pipe->MakeCallback(env->oncomplete_string(), 0, nullptr).ToLocalChecked();
stream()->RemoveStreamListener(this);
}
return;
}

if (pipe->is_eof_) {
AsyncScope async_scope(pipe);
pipe->ShutdownWritable();
pipe->sink()->Shutdown();
pipe->Unpipe();
return;
}
Expand All @@ -175,6 +187,10 @@ void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
prev->OnStreamAfterWrite(w, status);
return;
}

if (!pipe->uses_wants_write_) {
OnStreamWantsWrite(65536);
}
}

void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
Expand All @@ -198,6 +214,7 @@ void StreamPipe::WritableListener::OnStreamDestroy() {
StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
pipe->sink_destroyed_ = true;
pipe->is_eof_ = true;
pipe->pending_writes_ = 0;
pipe->Unpipe();
}

Expand Down Expand Up @@ -236,8 +253,7 @@ void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
StreamPipe* pipe;
ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
pipe->is_closed_ = false;
if (pipe->wanted_data_ > 0)
pipe->writable_listener_.OnStreamWantsWrite(pipe->wanted_data_);
pipe->writable_listener_.OnStreamWantsWrite(65536);
}

void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
Expand All @@ -246,6 +262,18 @@ void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
pipe->Unpipe();
}

void StreamPipe::IsClosed(const FunctionCallbackInfo<Value>& args) {
StreamPipe* pipe;
ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
args.GetReturnValue().Set(pipe->is_closed_);
}

void StreamPipe::PendingWrites(const FunctionCallbackInfo<Value>& args) {
StreamPipe* pipe;
ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
args.GetReturnValue().Set(pipe->pending_writes_);
}

namespace {

void InitializeStreamPipe(Local<Object> target,
Expand All @@ -260,6 +288,8 @@ void InitializeStreamPipe(Local<Object> target,
FIXED_ONE_BYTE_STRING(env->isolate(), "StreamPipe");
env->SetProtoMethod(pipe, "unpipe", StreamPipe::Unpipe);
env->SetProtoMethod(pipe, "start", StreamPipe::Start);
env->SetProtoMethod(pipe, "isClosed", StreamPipe::IsClosed);
env->SetProtoMethod(pipe, "pendingWrites", StreamPipe::PendingWrites);
pipe->Inherit(AsyncWrap::GetConstructorTemplate(env));
pipe->SetClassName(stream_pipe_string);
pipe->InstanceTemplate()->SetInternalFieldCount(1);
Expand Down
7 changes: 4 additions & 3 deletions src/stream_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class StreamPipe : public AsyncWrap {
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Unpipe(const v8::FunctionCallbackInfo<v8::Value>& args);
static void IsClosed(const v8::FunctionCallbackInfo<v8::Value>& args);
static void PendingWrites(const v8::FunctionCallbackInfo<v8::Value>& args);

SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(StreamPipe)
Expand All @@ -26,14 +28,13 @@ class StreamPipe : public AsyncWrap {
inline StreamBase* source();
inline StreamBase* sink();

inline void ShutdownWritable();

int pending_writes_ = 0;
bool is_reading_ = false;
bool is_writing_ = false;
bool is_eof_ = false;
bool is_closed_ = true;
bool sink_destroyed_ = false;
bool source_destroyed_ = false;
bool uses_wants_write_ = false;

// Set a default value so that when we’re coming from Start(), we know
// that we don’t want to read just yet.
Expand Down

0 comments on commit eb88db6

Please sign in to comment.