From 023548c0a55151b04a45dfbb405238ec030fc9bd Mon Sep 17 00:00:00 2001 From: Jordan Schidlowsky Date: Thu, 29 Jul 2021 14:11:27 -0600 Subject: [PATCH] Websocket peer outbound buffer fixes. Expose outbound buffered amount. --- modules/websocket/doc_classes/WebSocketPeer.xml | 7 +++++++ modules/websocket/emws_client.cpp | 4 +++- modules/websocket/emws_client.h | 1 + modules/websocket/emws_peer.cpp | 14 +++++++++++++- modules/websocket/emws_peer.h | 5 ++++- modules/websocket/library_godot_websocket.js | 14 ++++++++++++++ modules/websocket/websocket_peer.cpp | 1 + modules/websocket/websocket_peer.h | 1 + modules/websocket/wsl_peer.cpp | 14 +++++++++++++- modules/websocket/wsl_peer.h | 4 ++++ 10 files changed, 61 insertions(+), 4 deletions(-) diff --git a/modules/websocket/doc_classes/WebSocketPeer.xml b/modules/websocket/doc_classes/WebSocketPeer.xml index 0a99fe54d6bf..7add499d20ef 100644 --- a/modules/websocket/doc_classes/WebSocketPeer.xml +++ b/modules/websocket/doc_classes/WebSocketPeer.xml @@ -39,6 +39,13 @@ [b]Note:[/b] Not available in the HTML5 export. + + + + + Returns the current amount of data in the outbound websocket buffer. [b]Note:[/b] HTML5 exports use WebSocket.bufferedAmount, while other platforms use an internal buffer. + + diff --git a/modules/websocket/emws_client.cpp b/modules/websocket/emws_client.cpp index 66c7efb1a6e0..b814980fad59 100644 --- a/modules/websocket/emws_client.cpp +++ b/modules/websocket/emws_client.cpp @@ -95,7 +95,7 @@ Error EMWSClient::connect_to_host(String p_host, String p_path, uint16_t p_port, return FAILED; } - static_cast>(_peer)->set_sock(_js_id, _in_buf_size, _in_pkt_size); + static_cast>(_peer)->set_sock(_js_id, _in_buf_size, _in_pkt_size, _out_buf_size); return OK; } @@ -136,12 +136,14 @@ int EMWSClient::get_max_packet_size() const { Error EMWSClient::set_buffers(int p_in_buffer, int p_in_packets, int p_out_buffer, int p_out_packets) { _in_buf_size = nearest_shift(p_in_buffer - 1) + 10; _in_pkt_size = nearest_shift(p_in_packets - 1); + _out_buf_size = nearest_shift(p_out_buffer - 1) + 10; return OK; } EMWSClient::EMWSClient() { _in_buf_size = nearest_shift((int)GLOBAL_GET(WSC_IN_BUF) - 1) + 10; _in_pkt_size = nearest_shift((int)GLOBAL_GET(WSC_IN_PKT) - 1); + _out_buf_size = nearest_shift((int)GLOBAL_GET(WSC_OUT_BUF) - 1) + 10; _is_connecting = false; _peer = Ref(memnew(EMWSPeer)); _js_id = 0; diff --git a/modules/websocket/emws_client.h b/modules/websocket/emws_client.h index 81bf0acc2928..44f0db97a1c8 100644 --- a/modules/websocket/emws_client.h +++ b/modules/websocket/emws_client.h @@ -45,6 +45,7 @@ class EMWSClient : public WebSocketClient { bool _is_connecting; int _in_buf_size; int _in_pkt_size; + int _out_buf_size; static void _esws_on_connect(void *obj, char *proto); static void _esws_on_message(void *obj, const uint8_t *p_data, int p_data_size, int p_is_string); diff --git a/modules/websocket/emws_peer.cpp b/modules/websocket/emws_peer.cpp index 58f276232bfc..96fcb307d6ed 100644 --- a/modules/websocket/emws_peer.cpp +++ b/modules/websocket/emws_peer.cpp @@ -33,10 +33,11 @@ #include "emws_peer.h" #include "core/io/ip.h" -void EMWSPeer::set_sock(int p_sock, unsigned int p_in_buf_size, unsigned int p_in_pkt_size) { +void EMWSPeer::set_sock(int p_sock, unsigned int p_in_buf_size, unsigned int p_in_pkt_size, unsigned int p_out_buf_size) { peer_sock = p_sock; _in_buffer.resize(p_in_pkt_size, p_in_buf_size); _packet_buffer.resize((1 << p_in_buf_size)); + _out_buf_size = p_out_buf_size; } void EMWSPeer::set_write_mode(WriteMode p_mode) { @@ -53,7 +54,10 @@ Error EMWSPeer::read_msg(const uint8_t *p_data, uint32_t p_size, bool p_is_strin } Error EMWSPeer::put_packet(const uint8_t *p_buffer, int p_buffer_size) { + ERR_FAIL_COND_V(_out_buf_size && (godot_js_websocket_buffered_amount(peer_sock) >= (1ULL << _out_buf_size)), ERR_OUT_OF_MEMORY); + int is_bin = write_mode == WebSocketPeer::WRITE_MODE_BINARY ? 1 : 0; + godot_js_websocket_send(peer_sock, p_buffer, p_buffer_size, is_bin); return OK; }; @@ -77,6 +81,13 @@ int EMWSPeer::get_available_packet_count() const { return _in_buffer.packets_left(); }; +int EMWSPeer::get_current_outbound_buffered_amount() const { + if (peer_sock != -1) { + return godot_js_websocket_buffered_amount(peer_sock); + } + return 0; +} + bool EMWSPeer::was_string_packet() const { return _is_string; }; @@ -107,6 +118,7 @@ void EMWSPeer::set_no_delay(bool p_enabled) { } EMWSPeer::EMWSPeer() { + _out_buf_size = 0; peer_sock = -1; write_mode = WRITE_MODE_BINARY; close(); diff --git a/modules/websocket/emws_peer.h b/modules/websocket/emws_peer.h index 292b5589490b..8134835f5e89 100644 --- a/modules/websocket/emws_peer.h +++ b/modules/websocket/emws_peer.h @@ -48,6 +48,7 @@ typedef void (*WSOnError)(void *p_ref); extern int godot_js_websocket_create(void *p_ref, const char *p_url, const char *p_proto, WSOnOpen p_on_open, WSOnMessage p_on_message, WSOnError p_on_error, WSOnClose p_on_close); extern int godot_js_websocket_send(int p_id, const uint8_t *p_buf, int p_buf_len, int p_raw); +extern int godot_js_websocket_buffered_amount(int p_id); extern void godot_js_websocket_close(int p_id, int p_code, const char *p_reason); extern void godot_js_websocket_destroy(int p_id); } @@ -62,14 +63,16 @@ class EMWSPeer : public WebSocketPeer { PoolVector _packet_buffer; PacketBuffer _in_buffer; uint8_t _is_string; + int _out_buf_size; public: Error read_msg(const uint8_t *p_data, uint32_t p_size, bool p_is_string); - void set_sock(int p_sock, unsigned int p_in_buf_size, unsigned int p_in_pkt_size); + void set_sock(int p_sock, unsigned int p_in_buf_size, unsigned int p_in_pkt_size, unsigned int p_out_buf_size); virtual int get_available_packet_count() const; virtual Error get_packet(const uint8_t **r_buffer, int &r_buffer_size); virtual Error put_packet(const uint8_t *p_buffer, int p_buffer_size); virtual int get_max_packet_size() const { return _packet_buffer.size(); }; + virtual int get_current_outbound_buffered_amount() const; virtual void close(int p_code = 1000, String p_reason = ""); virtual bool is_connected_to_host() const; diff --git a/modules/websocket/library_godot_websocket.js b/modules/websocket/library_godot_websocket.js index b182d1ecde19..dd2fd1e94f75 100644 --- a/modules/websocket/library_godot_websocket.js +++ b/modules/websocket/library_godot_websocket.js @@ -101,6 +101,15 @@ const GodotWebSocket = { return 0; }, + // Get current bufferedAmount + bufferedAmount: function (p_id) { + const ref = IDHandler.get(p_id); + if (!ref) { + return 0; // Godot object is gone. + } + return ref.bufferedAmount; + }, + create: function (socket, p_on_open, p_on_message, p_on_error, p_on_close) { const id = IDHandler.add(socket); socket.onopen = GodotWebSocket._onopen.bind(null, id, p_on_open); @@ -171,6 +180,11 @@ const GodotWebSocket = { return GodotWebSocket.send(p_id, out); }, + godot_js_websocket_buffered_amount__sig: 'ii', + godot_js_websocket_buffered_amount: function (p_id) { + return GodotWebSocket.bufferedAmount(p_id); + }, + godot_js_websocket_close__sig: 'viii', godot_js_websocket_close: function (p_id, p_code, p_reason) { const code = p_code; diff --git a/modules/websocket/websocket_peer.cpp b/modules/websocket/websocket_peer.cpp index e77fdcfed202..ee13040821fd 100644 --- a/modules/websocket/websocket_peer.cpp +++ b/modules/websocket/websocket_peer.cpp @@ -47,6 +47,7 @@ void WebSocketPeer::_bind_methods() { ClassDB::bind_method(D_METHOD("get_connected_host"), &WebSocketPeer::get_connected_host); ClassDB::bind_method(D_METHOD("get_connected_port"), &WebSocketPeer::get_connected_port); ClassDB::bind_method(D_METHOD("set_no_delay", "enabled"), &WebSocketPeer::set_no_delay); + ClassDB::bind_method(D_METHOD("get_current_outbound_buffered_amount"), &WebSocketPeer::get_current_outbound_buffered_amount); BIND_ENUM_CONSTANT(WRITE_MODE_TEXT); BIND_ENUM_CONSTANT(WRITE_MODE_BINARY); diff --git a/modules/websocket/websocket_peer.h b/modules/websocket/websocket_peer.h index 04138289ec28..683dacc97ce4 100644 --- a/modules/websocket/websocket_peer.h +++ b/modules/websocket/websocket_peer.h @@ -53,6 +53,7 @@ class WebSocketPeer : public PacketPeer { virtual Error get_packet(const uint8_t **r_buffer, int &r_buffer_size) = 0; virtual Error put_packet(const uint8_t *p_buffer, int p_buffer_size) = 0; virtual int get_max_packet_size() const = 0; + virtual int get_current_outbound_buffered_amount() const = 0; virtual WriteMode get_write_mode() const = 0; virtual void set_write_mode(WriteMode p_mode) = 0; diff --git a/modules/websocket/wsl_peer.cpp b/modules/websocket/wsl_peer.cpp index c5326f1eb190..ad0935399758 100644 --- a/modules/websocket/wsl_peer.cpp +++ b/modules/websocket/wsl_peer.cpp @@ -205,7 +205,9 @@ void WSLPeer::make_context(PeerData *p_data, unsigned int p_in_buf_size, unsigne ERR_FAIL_COND(p_data == nullptr); _in_buffer.resize(p_in_pkt_size, p_in_buf_size); - _packet_buffer.resize((1 << MAX(p_in_buf_size, p_out_buf_size))); + _packet_buffer.resize(1 << p_in_buf_size); + _out_buf_size = p_out_buf_size; + _out_pkt_size = p_out_pkt_size; _data = p_data; _data->peer = this; @@ -239,6 +241,8 @@ void WSLPeer::poll() { Error WSLPeer::put_packet(const uint8_t *p_buffer, int p_buffer_size) { ERR_FAIL_COND_V(!is_connected_to_host(), FAILED); + ERR_FAIL_COND_V(_out_pkt_size && (wslay_event_get_queued_msg_count(_data->ctx) >= (1ULL << _out_pkt_size)), ERR_OUT_OF_MEMORY); + ERR_FAIL_COND_V(_out_buf_size && (wslay_event_get_queued_msg_length(_data->ctx) >= (1ULL << _out_buf_size)), ERR_OUT_OF_MEMORY); struct wslay_event_msg msg; // Should I use fragmented? msg.opcode = write_mode == WRITE_MODE_TEXT ? WSLAY_TEXT_FRAME : WSLAY_BINARY_FRAME; @@ -280,6 +284,12 @@ int WSLPeer::get_available_packet_count() const { return _in_buffer.packets_left(); } +int WSLPeer::get_current_outbound_buffered_amount() const { + ERR_FAIL_COND_V(!_data, 0); + + return wslay_event_get_queued_msg_length(_data->ctx); +} + bool WSLPeer::was_string_packet() const { return _is_string; } @@ -333,6 +343,8 @@ WSLPeer::WSLPeer() { _is_string = 0; close_code = -1; write_mode = WRITE_MODE_BINARY; + _out_buf_size = 0; + _out_pkt_size = 0; } WSLPeer::~WSLPeer() { diff --git a/modules/websocket/wsl_peer.h b/modules/websocket/wsl_peer.h index 40c3ba21227a..20baf4f0ce7d 100644 --- a/modules/websocket/wsl_peer.h +++ b/modules/websocket/wsl_peer.h @@ -89,6 +89,9 @@ class WSLPeer : public WebSocketPeer { WriteMode write_mode; + int _out_buf_size; + int _out_pkt_size; + public: int close_code; String close_reason; @@ -98,6 +101,7 @@ class WSLPeer : public WebSocketPeer { virtual Error get_packet(const uint8_t **r_buffer, int &r_buffer_size); virtual Error put_packet(const uint8_t *p_buffer, int p_buffer_size); virtual int get_max_packet_size() const { return _packet_buffer.size(); }; + virtual int get_current_outbound_buffered_amount() const; virtual void close_now(); virtual void close(int p_code = 1000, String p_reason = "");