Skip to content

Commit

Permalink
inspector: simplify buffer management
Browse files Browse the repository at this point in the history
This change simplifies buffer management to address a number of issues
that original implementation had.

Original implementation was trying to reduce the number of allocations
by providing regions of the internal buffer to libuv IO code. This
introduced some potential use after free issues if the buffer grows
(or shrinks) while there's a pending read. It also had some confusing
math that resulted in issues on Windows version of the libuv.

PR-URL: nodejs#8257
Fixes: nodejs#8155
Reviewed-By: bnoordhuis - Ben Noordhuis <info@bnoordhuis.nl>
  • Loading branch information
Eugene Ostroukhov authored and Fishrock123 committed Sep 8, 2016
1 parent aae0593 commit 633c9b7
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 68 deletions.
2 changes: 1 addition & 1 deletion src/inspector_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ void InterruptCallback(v8::Isolate*, void* agent) {
}

void DataCallback(uv_stream_t* stream, ssize_t read, const uv_buf_t* buf) {
inspector_socket_t* socket = static_cast<inspector_socket_t*>(stream->data);
inspector_socket_t* socket = inspector_from_stream(stream);
static_cast<AgentImpl*>(socket->data)->OnRemoteDataIO(socket, read, buf);
}

Expand Down
99 changes: 41 additions & 58 deletions src/inspector_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,17 @@ static void dump_hex(const char* buf, size_t len) {
}
#endif

static void remove_from_beginning(std::vector<char>* buffer, size_t count) {
buffer->erase(buffer->begin(), buffer->begin() + count);
}

static void dispose_inspector(uv_handle_t* handle) {
inspector_socket_t* inspector =
reinterpret_cast<inspector_socket_t*>(handle->data);
inspector_socket_t* inspector = inspector_from_stream(handle);
inspector_cb close =
inspector->ws_mode ? inspector->ws_state->close_cb : nullptr;
inspector->buffer.clear();
delete inspector->ws_state;
inspector->ws_state = nullptr;
inspector->data_len = 0;
inspector->last_read_end = 0;
if (close) {
close(inspector, 0);
}
Expand Down Expand Up @@ -159,21 +160,19 @@ static std::vector<char> encode_frame_hybi17(const char* message,
return frame;
}

static ws_decode_result decode_frame_hybi17(const char* buffer_begin,
size_t data_length,
static ws_decode_result decode_frame_hybi17(const std::vector<char>& buffer,
bool client_frame,
int* bytes_consumed,
std::vector<char>* output,
bool* compressed) {
*bytes_consumed = 0;
if (data_length < 2)
if (buffer.size() < 2)
return FRAME_INCOMPLETE;

const char* p = buffer_begin;
const char* buffer_end = p + data_length;
auto it = buffer.begin();

unsigned char first_byte = *p++;
unsigned char second_byte = *p++;
unsigned char first_byte = *it++;
unsigned char second_byte = *it++;

bool final = (first_byte & kFinalBit) != 0;
bool reserved1 = (first_byte & kReserved1Bit) != 0;
Expand Down Expand Up @@ -215,12 +214,12 @@ static ws_decode_result decode_frame_hybi17(const char* buffer_begin,
} else {
return FRAME_ERROR;
}
if (buffer_end - p < extended_payload_length_size)
if ((buffer.end() - it) < extended_payload_length_size)
return FRAME_INCOMPLETE;
payload_length64 = 0;
for (int i = 0; i < extended_payload_length_size; ++i) {
payload_length64 <<= 8;
payload_length64 |= static_cast<unsigned char>(*p++);
payload_length64 |= static_cast<unsigned char>(*it++);
}
}

Expand All @@ -233,16 +232,16 @@ static ws_decode_result decode_frame_hybi17(const char* buffer_begin,
}
size_t payload_length = static_cast<size_t>(payload_length64);

if (data_length - kMaskingKeyWidthInBytes < payload_length)
if (buffer.size() - kMaskingKeyWidthInBytes < payload_length)
return FRAME_INCOMPLETE;

const char* masking_key = p;
const char* payload = p + kMaskingKeyWidthInBytes;
std::vector<char>::const_iterator masking_key = it;
std::vector<char>::const_iterator payload = it + kMaskingKeyWidthInBytes;
for (size_t i = 0; i < payload_length; ++i) // Unmask the payload.
output->insert(output->end(),
payload[i] ^ masking_key[i % kMaskingKeyWidthInBytes]);

size_t pos = p + kMaskingKeyWidthInBytes + payload_length - buffer_begin;
size_t pos = it + kMaskingKeyWidthInBytes + payload_length - buffer.begin();
*bytes_consumed = pos;
return closed ? FRAME_CLOSE : FRAME_OK;
}
Expand Down Expand Up @@ -280,13 +279,13 @@ static void close_frame_received(inspector_socket_t* inspector) {
}
}

static int parse_ws_frames(inspector_socket_t* inspector, size_t len) {
static int parse_ws_frames(inspector_socket_t* inspector) {
int bytes_consumed = 0;
std::vector<char> output;
bool compressed = false;

ws_decode_result r = decode_frame_hybi17(&inspector->buffer[0],
len, true /* client_frame */,
ws_decode_result r = decode_frame_hybi17(inspector->buffer,
true /* client_frame */,
&bytes_consumed, &output,
&compressed);
// Compressed frame means client is ignoring the headers and misbehaves
Expand All @@ -312,24 +311,22 @@ static int parse_ws_frames(inspector_socket_t* inspector, size_t len) {
}

static void prepare_buffer(uv_handle_t* stream, size_t len, uv_buf_t* buf) {
inspector_socket_t* inspector =
reinterpret_cast<inspector_socket_t*>(stream->data);
*buf = uv_buf_init(new char[len], len);
}

if (len > (inspector->buffer.size() - inspector->data_len)) {
int new_size = (inspector->data_len + len + BUFFER_GROWTH_CHUNK_SIZE - 1) /
BUFFER_GROWTH_CHUNK_SIZE *
BUFFER_GROWTH_CHUNK_SIZE;
inspector->buffer.resize(new_size);
static void reclaim_uv_buf(inspector_socket_t* inspector, const uv_buf_t* buf,
ssize_t read) {
if (read > 0) {
std::vector<char>& buffer = inspector->buffer;
buffer.insert(buffer.end(), buf->base, buf->base + read);
}
buf->base = &inspector->buffer[inspector->data_len];
buf->len = len;
inspector->data_len += len;
delete[] buf->base;
}

static void websockets_data_cb(uv_stream_t* stream, ssize_t nread,
const uv_buf_t* buf) {
inspector_socket_t* inspector =
reinterpret_cast<inspector_socket_t*>(stream->data);
inspector_socket_t* inspector = inspector_from_stream(stream);
reclaim_uv_buf(inspector, buf, nread);
if (nread < 0 || nread == UV_EOF) {
inspector->connection_eof = true;
if (!inspector->shutting_down && inspector->ws_state->read_cb) {
Expand All @@ -339,29 +336,19 @@ static void websockets_data_cb(uv_stream_t* stream, ssize_t nread,
#if DUMP_READS
printf("%s read %ld bytes\n", __FUNCTION__, nread);
if (nread > 0) {
dump_hex(buf->base, nread);
dump_hex(inspector->buffer.data() + inspector->buffer.size() - nread,
nread);
}
#endif
// 1. Move read bytes to continue the buffer
// Should be same as this is supposedly last buffer
ASSERT_EQ(buf->base + buf->len, &inspector->buffer[inspector->data_len]);

// Should be noop...
memmove(&inspector->buffer[inspector->last_read_end], buf->base, nread);
inspector->last_read_end += nread;

// 2. Parse.
int processed = 0;
do {
processed = parse_ws_frames(inspector, inspector->last_read_end);
processed = parse_ws_frames(inspector);
// 3. Fix the buffer size & length
if (processed > 0) {
memmove(&inspector->buffer[0], &inspector->buffer[processed],
inspector->last_read_end - processed);
inspector->last_read_end -= processed;
inspector->data_len = inspector->last_read_end;
remove_from_beginning(&inspector->buffer, processed);
}
} while (processed > 0 && inspector->data_len > 0);
} while (processed > 0 && !inspector->buffer.empty());
}
}

Expand Down Expand Up @@ -435,7 +422,6 @@ static void handshake_complete(inspector_socket_t* inspector) {
uv_read_stop(reinterpret_cast<uv_stream_t*>(&inspector->client));
handshake_cb callback = inspector->http_parsing_state->callback;
inspector->ws_state = new ws_state_s();
inspector->last_read_end = 0;
inspector->ws_mode = true;
callback(inspector, kInspectorHandshakeUpgraded,
inspector->http_parsing_state->path);
Expand All @@ -448,8 +434,7 @@ static void cleanup_http_parsing_state(inspector_socket_t* inspector) {

static void report_handshake_failure_cb(uv_handle_t* handle) {
dispose_inspector(handle);
inspector_socket_t* inspector =
static_cast<inspector_socket_t*>(handle->data);
inspector_socket_t* inspector = inspector_from_stream(handle);
handshake_cb cb = inspector->http_parsing_state->callback;
cleanup_http_parsing_state(inspector);
cb(inspector, kInspectorHandshakeFailed, std::string());
Expand Down Expand Up @@ -481,8 +466,7 @@ static void init_handshake(inspector_socket_t* inspector);
static int message_complete_cb(http_parser* parser) {
inspector_socket_t* inspector =
reinterpret_cast<inspector_socket_t*>(parser->data);
struct http_parsing_state_s* state =
(struct http_parsing_state_s*) inspector->http_parsing_state;
struct http_parsing_state_s* state = inspector->http_parsing_state;
if (parser->method != HTTP_GET) {
handshake_failed(inspector);
} else if (!parser->upgrade) {
Expand Down Expand Up @@ -527,22 +511,22 @@ static void data_received_cb(uv_stream_s* client, ssize_t nread,
printf("[%s:%d] %s\n", __FUNCTION__, __LINE__, uv_err_name(nread));
}
#endif
inspector_socket_t* inspector =
reinterpret_cast<inspector_socket_t*>((client->data));
inspector_socket_t* inspector = inspector_from_stream(client);
reclaim_uv_buf(inspector, buf, nread);
if (nread < 0 || nread == UV_EOF) {
close_and_report_handshake_failure(inspector);
} else {
http_parsing_state_s* state = inspector->http_parsing_state;
http_parser* parser = &state->parser;
http_parser_execute(parser, &state->parser_settings, &inspector->buffer[0],
nread);
http_parser_execute(parser, &state->parser_settings,
inspector->buffer.data(), nread);
remove_from_beginning(&inspector->buffer, nread);
if (parser->http_errno != HPE_OK) {
handshake_failed(inspector);
}
if (inspector->http_parsing_state->done) {
cleanup_http_parsing_state(inspector);
}
inspector->data_len = 0;
}
}

Expand Down Expand Up @@ -576,7 +560,6 @@ int inspector_accept(uv_stream_t* server, inspector_socket_t* inspector,
err = uv_accept(server, client);
}
if (err == 0) {
client->data = inspector;
init_handshake(inspector);
inspector->http_parsing_state->callback = callback;
err = uv_read_start(client, prepare_buffer,
Expand Down
19 changes: 16 additions & 3 deletions src/inspector_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#define SRC_INSPECTOR_SOCKET_H_

#include "http_parser.h"
#include "util.h"
#include "util-inl.h"
#include "uv.h"

#include <string>
Expand Down Expand Up @@ -48,8 +50,6 @@ struct inspector_socket_s {
struct http_parsing_state_s* http_parsing_state;
struct ws_state_s* ws_state;
std::vector<char> buffer;
size_t data_len;
size_t last_read_end;
uv_tcp_t client;
bool ws_mode;
bool shutting_down;
Expand All @@ -64,12 +64,25 @@ int inspector_accept(uv_stream_t* server, struct inspector_socket_s* inspector,
void inspector_close(struct inspector_socket_s* inspector,
inspector_cb callback);

// Callbacks will receive handles that has inspector in data field...
// Callbacks will receive stream handles. Use inspector_from_stream to get
// inspector_socket_t* from the stream handle.
int inspector_read_start(struct inspector_socket_s* inspector, uv_alloc_cb,
uv_read_cb);
void inspector_read_stop(struct inspector_socket_s* inspector);
void inspector_write(struct inspector_socket_s* inspector,
const char* data, size_t len);
bool inspector_is_active(const struct inspector_socket_s* inspector);

inline inspector_socket_t* inspector_from_stream(uv_tcp_t* stream) {
return node::ContainerOf(&inspector_socket_t::client, stream);
}

inline inspector_socket_t* inspector_from_stream(uv_stream_t* stream) {
return inspector_from_stream(reinterpret_cast<uv_tcp_t*>(stream));
}

inline inspector_socket_t* inspector_from_stream(uv_handle_t* stream) {
return inspector_from_stream(reinterpret_cast<uv_tcp_t*>(stream));
}

#endif // SRC_INSPECTOR_SOCKET_H_
10 changes: 4 additions & 6 deletions test/cctest/test_inspector_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ struct expectations {

static void grow_expects_buffer(uv_handle_t* stream, size_t size, uv_buf_t* b) {
expectations* expects = static_cast<expectations*>(
(static_cast<inspector_socket_t*>(stream->data))->data);
inspector_from_stream(stream)->data);
size_t end = expects->actual_end;
// Grow the buffer in chunks of 64k.
size_t new_length = (end + size + 65535) & ~((size_t) 0xFFFF);
Expand Down Expand Up @@ -213,7 +213,7 @@ static void grow_expects_buffer(uv_handle_t* stream, size_t size, uv_buf_t* b) {
static void save_read_data(uv_stream_t* stream, ssize_t nread,
const uv_buf_t* buf) {
expectations* expects =static_cast<expectations*>(
(static_cast<inspector_socket_t*>(stream->data))->data);
inspector_from_stream(stream)->data);
expects->err_code = nread < 0 ? nread : 0;
if (nread > 0) {
expects->actual_end += nread;
Expand Down Expand Up @@ -254,8 +254,7 @@ static void expect_on_server(const char* data, size_t len) {

static void inspector_record_error_code(uv_stream_t* stream, ssize_t nread,
const uv_buf_t* buf) {
inspector_socket_t *inspector =
reinterpret_cast<inspector_socket_t*>(stream->data);
inspector_socket_t *inspector = inspector_from_stream(stream);
// Increment instead of assign is to ensure the function is only called once
*(static_cast<int *>(inspector->data)) += nread;
}
Expand Down Expand Up @@ -760,8 +759,7 @@ static void CleanupSocketAfterEOF_close_cb(inspector_socket_t* inspector,
static void CleanupSocketAfterEOF_read_cb(uv_stream_t* stream, ssize_t nread,
const uv_buf_t* buf) {
EXPECT_EQ(UV_EOF, nread);
inspector_socket_t* insp =
reinterpret_cast<inspector_socket_t*>(stream->data);
inspector_socket_t* insp = inspector_from_stream(stream);
inspector_close(insp, CleanupSocketAfterEOF_close_cb);
}

Expand Down

0 comments on commit 633c9b7

Please sign in to comment.