Skip to content

Commit

Permalink
Merge branch 'develop' into feature/async-framework-repeating-op
Browse files Browse the repository at this point in the history
  • Loading branch information
godexsoft authored Dec 19, 2024
2 parents 51d1503 + 7d4e361 commit 8c86e9a
Show file tree
Hide file tree
Showing 13 changed files with 238 additions and 204 deletions.
25 changes: 14 additions & 11 deletions src/web/ng/Connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ class Connection : public ConnectionMetadata {
public:
/**
* @brief The default timeout for send, receive, and close operations.
* @note This value should be higher than forwarding timeout to not disconnect clients if rippled is slow.
*/
static constexpr std::chrono::steady_clock::duration DEFAULT_TIMEOUT = std::chrono::seconds{30};
static constexpr std::chrono::steady_clock::duration DEFAULT_TIMEOUT = std::chrono::seconds{11};

/**
* @brief Construct a new Connection object
Expand All @@ -116,39 +117,41 @@ class Connection : public ConnectionMetadata {
*/
Connection(std::string ip, boost::beast::flat_buffer buffer, util::TagDecoratorFactory const& tagDecoratorFactory);

/**
* @brief Get the timeout for send, receive, and close operations. For WebSocket connections, this is the ping
* interval.
*
* @param newTimeout The new timeout to set.
*/
virtual void
setTimeout(std::chrono::steady_clock::duration newTimeout) = 0;

/**
* @brief Send a response to the client.
*
* @param response The response to send.
* @param yield The yield context.
* @param timeout The timeout for the operation.
* @return An error if the operation failed or nullopt if it succeeded.
*/
virtual std::optional<Error>
send(
Response response,
boost::asio::yield_context yield,
std::chrono::steady_clock::duration timeout = DEFAULT_TIMEOUT
) = 0;
send(Response response, boost::asio::yield_context yield) = 0;

/**
* @brief Receive a request from the client.
*
* @param yield The yield context.
* @param timeout The timeout for the operation.
* @return The request if it was received or an error if the operation failed.
*/
virtual std::expected<Request, Error>
receive(boost::asio::yield_context yield, std::chrono::steady_clock::duration timeout = DEFAULT_TIMEOUT) = 0;
receive(boost::asio::yield_context yield) = 0;

/**
* @brief Gracefully close the connection.
*
* @param yield The yield context.
* @param timeout The timeout for the operation.
*/
virtual void
close(boost::asio::yield_context yield, std::chrono::steady_clock::duration timeout = DEFAULT_TIMEOUT) = 0;
close(boost::asio::yield_context yield) = 0;
};

/**
Expand Down
3 changes: 3 additions & 0 deletions src/web/ng/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ Server::onWs(MessageHandler handler)
std::optional<std::string>
Server::run()
{
LOG(log_.info()) << "Starting ng::Server";
auto acceptor = makeAcceptor(ctx_.get(), endpoint_);
if (not acceptor.has_value())
return std::move(acceptor).error();
Expand All @@ -236,6 +237,7 @@ Server::run()
boost::asio::ip::tcp::socket socket{ctx_.get().get_executor()};

acceptor.async_accept(socket, yield[errorCode]);
LOG(log_.trace()) << "Accepted a new connection";
if (errorCode) {
LOG(log_.debug()) << "Error accepting a connection: " << errorCode.what();
continue;
Expand Down Expand Up @@ -290,6 +292,7 @@ Server::handleConnection(boost::asio::ip::tcp::socket socket, boost::asio::yield
}
return;
}
LOG(log_.trace()) << connectionExpected.value()->tag() << "Connection created";

boost::asio::spawn(
ctx_.get(),
Expand Down
23 changes: 20 additions & 3 deletions src/web/ng/impl/ConnectionHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ ConnectionHandler::processConnection(ConnectionPtr connectionPtr, boost::asio::y
yield,
[this](Error const& e, Connection const& c) { return handleError(e, c); }
);
LOG(log_.trace()) << connectionRef.tag() << "Created SubscriptionContext for the connection";
}
SubscriptionContextPtr subscriptionContextInterfacePtr = subscriptionContext;

Expand All @@ -166,14 +167,21 @@ ConnectionHandler::processConnection(ConnectionPtr connectionPtr, boost::asio::y
break;
}

if (subscriptionContext != nullptr)
if (subscriptionContext != nullptr) {
subscriptionContext->disconnect(yield);
LOG(log_.trace()) << connectionRef.tag() << "SubscriptionContext disconnected";
}

if (shouldCloseGracefully)
if (shouldCloseGracefully) {
connectionRef.close(yield);
LOG(log_.trace()) << connectionRef.tag() << "Closed gracefully";
}

signalConnection.disconnect();
LOG(log_.trace()) << connectionRef.tag() << "Signal disconnected";

onDisconnectHook_(connectionRef);
LOG(log_.trace()) << connectionRef.tag() << "Processing finished";
}

void
Expand All @@ -185,6 +193,7 @@ ConnectionHandler::stop()
bool
ConnectionHandler::handleError(Error const& error, Connection const& connection) const
{
LOG(log_.trace()) << connection.tag() << "Got error: " << error << " " << error.message();
// ssl::error::stream_truncated, also known as an SSL "short read",
// indicates the peer closed the connection without performing the
// required closing handshake (for example, Google does this to
Expand All @@ -201,7 +210,8 @@ ConnectionHandler::handleError(Error const& error, Connection const& connection)
// Beast returns the error boost::beast::http::error::partial_message.
// Therefore, if we see a short read here, it has occurred
// after the message has been completed, so it is safe to ignore it.
if (error == boost::beast::http::error::end_of_stream || error == boost::asio::ssl::error::stream_truncated)
if (error == boost::beast::http::error::end_of_stream || error == boost::asio::ssl::error::stream_truncated ||
error == boost::asio::error::eof)
return false;

// WebSocket connection was gracefully closed
Expand Down Expand Up @@ -229,6 +239,7 @@ ConnectionHandler::sequentRequestResponseLoop(
// an error appears.
// - When server is shutting down it will cancel all operations on the connection so an error appears.

LOG(log_.trace()) << connection.tag() << "Processing sequentially";
while (true) {
auto expectedRequest = connection.receive(yield);
if (not expectedRequest)
Expand All @@ -250,12 +261,14 @@ ConnectionHandler::parallelRequestResponseLoop(
boost::asio::yield_context yield
)
{
LOG(log_.trace()) << connection.tag() << "Processing in parallel";
// atomic_bool is not needed here because everything happening on coroutine's strand
bool stop = false;
bool closeConnectionGracefully = true;
util::CoroutineGroup tasksGroup{yield, maxParallelRequests_};

while (not stop) {
LOG(log_.trace()) << connection.tag() << "Receiving request";
auto expectedRequest = connection.receive(yield);
if (not expectedRequest) {
auto const closeGracefully = handleError(expectedRequest.error(), connection);
Expand All @@ -282,7 +295,9 @@ ConnectionHandler::parallelRequestResponseLoop(
}
);
ASSERT(spawnSuccess, "The coroutine was expected to be spawned");
LOG(log_.trace()) << connection.tag() << "Spawned a coroutine to process request";
} else {
LOG(log_.trace()) << connection.tag() << "Too many requests from one connection, rejecting the request";
connection.send(
Response{
boost::beast::http::status::too_many_requests,
Expand All @@ -305,8 +320,10 @@ ConnectionHandler::processRequest(
boost::asio::yield_context yield
)
{
LOG(log_.trace()) << connection.tag() << "Processing request: " << request.message();
auto response = handleRequest(connection, subscriptionContext, request, yield);

LOG(log_.trace()) << connection.tag() << "Sending response: " << response.message();
auto const maybeError = connection.send(std::move(response), yield);
if (maybeError.has_value()) {
return handleError(maybeError.value(), connection);
Expand Down
61 changes: 26 additions & 35 deletions src/web/ng/impl/HttpConnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,7 @@ class UpgradableConnection : public Connection {
using Connection::Connection;

virtual std::expected<bool, Error>
isUpgradeRequested(
boost::asio::yield_context yield,
std::chrono::steady_clock::duration timeout = DEFAULT_TIMEOUT
) = 0;
isUpgradeRequested(boost::asio::yield_context yield) = 0;

virtual std::expected<ConnectionPtr, Error>
upgrade(
Expand All @@ -69,8 +66,7 @@ class UpgradableConnection : public Connection {
virtual std::optional<Error>
sendRaw(
boost::beast::http::response<boost::beast::http::string_body> response,
boost::asio::yield_context yield,
std::chrono::steady_clock::duration timeout = DEFAULT_TIMEOUT
boost::asio::yield_context yield
) = 0;
};

Expand All @@ -80,6 +76,7 @@ template <typename StreamType>
class HttpConnection : public UpgradableConnection {
StreamType stream_;
std::optional<boost::beast::http::request<boost::beast::http::string_body>> request_;
std::chrono::steady_clock::duration timeout_{DEFAULT_TIMEOUT};

public:
HttpConnection(
Expand Down Expand Up @@ -113,68 +110,62 @@ class HttpConnection : public UpgradableConnection {
}

std::optional<Error>
sendRaw(
boost::beast::http::response<boost::beast::http::string_body> response,
boost::asio::yield_context yield,
std::chrono::steady_clock::duration timeout = DEFAULT_TIMEOUT
) override
sendRaw(boost::beast::http::response<boost::beast::http::string_body> response, boost::asio::yield_context yield)
override
{
boost::system::error_code error;
boost::beast::get_lowest_layer(stream_).expires_after(timeout);
boost::beast::get_lowest_layer(stream_).expires_after(timeout_);
boost::beast::http::async_write(stream_, response, yield[error]);
if (error)
return error;
return std::nullopt;
}

void
setTimeout(std::chrono::steady_clock::duration newTimeout) override
{
timeout_ = newTimeout;
}

std::optional<Error>
send(
Response response,
boost::asio::yield_context yield,
std::chrono::steady_clock::duration timeout = DEFAULT_TIMEOUT
) override
send(Response response, boost::asio::yield_context yield) override
{
auto httpResponse = std::move(response).intoHttpResponse();
return sendRaw(std::move(httpResponse), yield, timeout);
return sendRaw(std::move(httpResponse), yield);
}

std::expected<Request, Error>
receive(boost::asio::yield_context yield, std::chrono::steady_clock::duration timeout = DEFAULT_TIMEOUT) override
receive(boost::asio::yield_context yield) override
{
if (request_.has_value()) {
Request result{std::move(request_).value()};
request_.reset();
return result;
}
auto expectedRequest = fetch(yield, timeout);
auto expectedRequest = fetch(yield);
if (expectedRequest.has_value())
return Request{std::move(expectedRequest).value()};

return std::unexpected{std::move(expectedRequest).error()};
}

void
close(boost::asio::yield_context yield, std::chrono::steady_clock::duration timeout = DEFAULT_TIMEOUT) override
close(boost::asio::yield_context yield) override
{
[[maybe_unused]] boost::system::error_code error;
if constexpr (IsSslTcpStream<StreamType>) {
boost::beast::get_lowest_layer(stream_).expires_after(timeout);
stream_.async_shutdown(yield[error]);
}
if constexpr (IsTcpStream<StreamType>) {
stream_.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_type::shutdown_both, error);
} else {
boost::beast::get_lowest_layer(stream_).socket().shutdown(
boost::asio::ip::tcp::socket::shutdown_type::shutdown_both, error
);
boost::beast::get_lowest_layer(stream_).expires_after(timeout_);
stream_.async_shutdown(yield[error]); // Close the SSL connection gracefully
}
boost::beast::get_lowest_layer(stream_).socket().shutdown(
boost::asio::ip::tcp::socket::shutdown_type::shutdown_both, error
);
}

std::expected<bool, Error>
isUpgradeRequested(boost::asio::yield_context yield, std::chrono::steady_clock::duration timeout = DEFAULT_TIMEOUT)
override
isUpgradeRequested(boost::asio::yield_context yield) override
{
auto expectedRequest = fetch(yield, timeout);
auto expectedRequest = fetch(yield);
if (not expectedRequest.has_value())
return std::unexpected{std::move(expectedRequest).error()};

Expand Down Expand Up @@ -217,11 +208,11 @@ class HttpConnection : public UpgradableConnection {

private:
std::expected<boost::beast::http::request<boost::beast::http::string_body>, Error>
fetch(boost::asio::yield_context yield, std::chrono::steady_clock::duration timeout)
fetch(boost::asio::yield_context yield)
{
boost::beast::http::request<boost::beast::http::string_body> request{};
boost::system::error_code error;
boost::beast::get_lowest_layer(stream_).expires_after(timeout);
boost::beast::get_lowest_layer(stream_).expires_after(timeout_);
boost::beast::http::async_read(stream_, buffer_, request, yield[error]);
if (error)
return std::unexpected{error};
Expand Down
Loading

0 comments on commit 8c86e9a

Please sign in to comment.