From 4923350ef78eab7f1eda0474716094a250f3a71a Mon Sep 17 00:00:00 2001 From: Moritz Heidkamp Date: Thu, 15 Feb 2024 12:18:13 +0100 Subject: [PATCH] Support cancellation during client connection establishment With #714 we added support for cancelling in-flight HTTP requests by putting the response deferred into an error state. However, this only worked once the underlying TCP connection was established. With this patch, it is now possible to cancel requests even while the connection is still being established (possible since Netty 4.1.108.Final via https://github.com/netty/netty/pull/13849). This also works for `aleph.tcp/client`. --- src/aleph/http.clj | 34 +++--- src/aleph/http/client.clj | 226 ++++++++++++++++++++------------------ src/aleph/netty.clj | 73 +++++++----- src/aleph/tcp.clj | 26 +++-- test/aleph/http_test.clj | 30 ++++- test/aleph/tcp_test.clj | 26 ++++- test/aleph/testutils.clj | 29 ++++- 7 files changed, 282 insertions(+), 162 deletions(-) diff --git a/src/aleph/http.clj b/src/aleph/http.clj index 06b70587..7a471844 100644 --- a/src/aleph/http.clj +++ b/src/aleph/http.clj @@ -100,20 +100,23 @@ will be errors, and a new connection must be created." [^URI uri options middleware on-closed] (let [scheme (.getScheme uri) - ssl? (= "https" scheme)] - (-> (client/http-connection - (InetSocketAddress/createUnresolved - (.getHost uri) - (int - (or - (when (pos? (.getPort uri)) (.getPort uri)) - (if ssl? 443 80)))) - ssl? - (if on-closed - (assoc options :on-closed on-closed) - options)) - - (d/chain' middleware)))) + ssl? (= "https" scheme) + conn (client/http-connection + (InetSocketAddress/createUnresolved + (.getHost uri) + (int + (or + (when (pos? (.getPort uri)) (.getPort uri)) + (if ssl? 443 80)))) + ssl? + (if on-closed + (assoc options :on-closed on-closed) + options))] + (doto (d/chain' conn middleware) + (d/catch' (fn [e] + (log/trace e "Terminating creation of HTTP connection") + (d/error! conn e) + (d/error-deferred e)))))) (def ^:private connection-stats-callbacks (atom #{})) @@ -389,6 +392,9 @@ ;; function. (reset! dispose-conn! (fn [] (flow/dispose pool k conn))) + ;; allow cancellation during connection establishment + (d/connect result (first conn)) + (if (realized? result) ;; to account for race condition between setting `dispose-conn!` ;; and putting `result` into error state for cancellation diff --git a/src/aleph/http/client.clj b/src/aleph/http/client.clj index 9b4e252a..2e08f641 100644 --- a/src/aleph/http/client.clj +++ b/src/aleph/http/client.clj @@ -821,112 +821,126 @@ :local-address local-address :transport (netty/determine-transport transport epoll?) :name-resolver name-resolver - :connect-timeout connect-timeout})] - - (attach-on-close-handler ch-d on-closed) - - (d/chain' ch-d - (fn setup-client - [^Channel ch] - (log/debug "Channel:" ch) - - ;; We know the SSL handshake must be complete because create-client wraps the - ;; future with maybe-ssl-handshake-future, so we can get the negotiated - ;; protocol, falling back to HTTP/1.1 by default. - (let [pipeline (.pipeline ch) - protocol (cond - ssl? - (or (-> pipeline - ^SslHandler (.get ^Class SslHandler) - (.applicationProtocol)) - ApplicationProtocolNames/HTTP_1_1) ; Not using ALPN, HTTP/2 isn't allowed - - force-h2c? - (do - (log/info "Forcing HTTP/2 over cleartext. Be sure to do this only with servers you control.") - ApplicationProtocolNames/HTTP_2) - - :else - ApplicationProtocolNames/HTTP_1_1) ; Not using SSL, HTTP/2 isn't allowed unless h2c requested - setup-opts (assoc opts - :authority authority - :ch ch - :server? false - :keep-alive? keep-alive? - :keep-alive?' keep-alive?' - :logger logger - :non-tun-proxy? non-tun-proxy? - :pipeline pipeline - :pipeline-transform pipeline-transform - :raw-stream? raw-stream? - :remote-address remote-address - :response-buffer-size response-buffer-size - :ssl-context ssl-context - :ssl? ssl?)] - - (log/debug (str "Using HTTP protocol: " protocol) - {:authority authority - :ssl? ssl? - :force-h2c? force-h2c?}) - - ;; can't use ApnHandler, because we need to coordinate with Manifold code - (let [http-req-handler - (cond (.equals ApplicationProtocolNames/HTTP_1_1 protocol) - (setup-http1-client setup-opts) - - (.equals ApplicationProtocolNames/HTTP_2 protocol) - (do - (http2/setup-conn-pipeline setup-opts) - (http2-req-handler setup-opts)) - - :else - (do - (let [msg (str "Unknown protocol: " protocol) - e (IllegalStateException. msg)] - (log/error e msg) - (netty/close ch) - (throw e))))] - - ;; Both Netty and Aleph are set up, unpause the pipeline - (when (.get pipeline "pause-handler") - (log/debug "Unpausing pipeline") - (.remove pipeline "pause-handler")) - - (fn http-req-fn - [req] - (log/trace "http-req-fn fired") - (log/debug "client request:" (pr-str req)) - - ;; If :aleph/close is set in the req, closes the channel and - ;; returns a deferred containing the result. - (if (or (contains? req :aleph/close) - (contains? req ::close)) - (-> ch (netty/close) (netty/wrap-future)) - - (let [t0 (System/nanoTime) - ;; I suspect the below is an error for http1 - ;; since the shared handler might not match. - ;; Should work for HTTP2, though - raw-stream? (get req :raw-stream? raw-stream?)] - - (if (or (not (.isActive ch)) - (not (.isOpen ch))) - - (d/error-deferred - (ex-info "Channel is inactive/closed." - {:req req - :ch ch - :open? (.isOpen ch) - :active? (.isActive ch)})) - - (-> (http-req-handler req) - (d/chain' (rsp-handler - {:ch ch - :keep-alive? keep-alive? ; why not keep-alive?' - :raw-stream? raw-stream? - :req req - :response-buffer-size response-buffer-size - :t0 t0}))))))))))))) + :connect-timeout connect-timeout}) + + _ (attach-on-close-handler ch-d on-closed) + + close-ch! (atom (fn [])) + result (d/deferred) + + conn (d/chain' ch-d + (fn setup-client + [^Channel ch] + (log/debug "Channel:" ch) + (reset! close-ch! (fn [] @(-> (netty/close ch) (netty/wrap-future)))) + (if (realized? result) + ;; Account for race condition between setting `close-ch!` and putting + ;; `result` into error state for cancellation + (@close-ch!) + ;; We know the SSL handshake must be complete because create-client wraps the + ;; future with maybe-ssl-handshake-future, so we can get the negotiated + ;; protocol, falling back to HTTP/1.1 by default. + (let [pipeline (.pipeline ch) + protocol (cond + ssl? + (or (-> pipeline + ^SslHandler (.get ^Class SslHandler) + (.applicationProtocol)) + ApplicationProtocolNames/HTTP_1_1) ; Not using ALPN, HTTP/2 isn't allowed + + force-h2c? + (do + (log/info "Forcing HTTP/2 over cleartext. Be sure to do this only with servers you control.") + ApplicationProtocolNames/HTTP_2) + + :else + ApplicationProtocolNames/HTTP_1_1) ; Not using SSL, HTTP/2 isn't allowed unless h2c requested + setup-opts (assoc opts + :authority authority + :ch ch + :server? false + :keep-alive? keep-alive? + :keep-alive?' keep-alive?' + :logger logger + :non-tun-proxy? non-tun-proxy? + :pipeline pipeline + :pipeline-transform pipeline-transform + :raw-stream? raw-stream? + :remote-address remote-address + :response-buffer-size response-buffer-size + :ssl-context ssl-context + :ssl? ssl?)] + + (log/debug (str "Using HTTP protocol: " protocol) + {:authority authority + :ssl? ssl? + :force-h2c? force-h2c?}) + + ;; can't use ApnHandler, because we need to coordinate with Manifold code + (let [http-req-handler + (cond (.equals ApplicationProtocolNames/HTTP_1_1 protocol) + (setup-http1-client setup-opts) + + (.equals ApplicationProtocolNames/HTTP_2 protocol) + (do + (http2/setup-conn-pipeline setup-opts) + (http2-req-handler setup-opts)) + + :else + (do + (let [msg (str "Unknown protocol: " protocol) + e (IllegalStateException. msg)] + (log/error e msg) + (netty/close ch) + (throw e))))] + + ;; Both Netty and Aleph are set up, unpause the pipeline + (when (.get pipeline "pause-handler") + (log/debug "Unpausing pipeline") + (.remove pipeline "pause-handler")) + + (fn http-req-fn + [req] + (log/trace "http-req-fn fired") + (log/debug "client request:" (pr-str req)) + + ;; If :aleph/close is set in the req, closes the channel and + ;; returns a deferred containing the result. + (if (or (contains? req :aleph/close) + (contains? req ::close)) + (-> ch (netty/close) (netty/wrap-future)) + + (let [t0 (System/nanoTime) + ;; I suspect the below is an error for http1 + ;; since the shared handler might not match. + ;; Should work for HTTP2, though + raw-stream? (get req :raw-stream? raw-stream?)] + + (if (or (not (.isActive ch)) + (not (.isOpen ch))) + + (d/error-deferred + (ex-info "Channel is inactive/closed." + {:req req + :ch ch + :open? (.isOpen ch) + :active? (.isActive ch)})) + + (-> (http-req-handler req) + (d/chain' (rsp-handler + {:ch ch + :keep-alive? keep-alive? ; why not keep-alive?' + :raw-stream? raw-stream? + :req req + :response-buffer-size response-buffer-size + :t0 t0}))))))))))))] + (d/connect conn result) + (d/catch' result (fn [e] + (log/trace e "Closing HTTP connection channel") + (d/error! ch-d e) + (@close-ch!) + (d/error-deferred e))) + result)) diff --git a/src/aleph/netty.clj b/src/aleph/netty.clj index 73fc1b6e..cdf203f7 100644 --- a/src/aleph/netty.clj +++ b/src/aleph/netty.clj @@ -1521,6 +1521,14 @@ (ssl-handler ch ssl-ctx)))) (pipeline-builder p)))) +(defn- connect-client + ^ChannelFuture [^Bootstrap bootstrap + ^SocketAddress remote-address + ^SocketAddress local-address] + (if local-address + (.connect bootstrap remote-address local-address) + (.connect bootstrap remote-address))) + (defn ^:no-doc create-client-chan "Returns a deferred containing a new Channel. @@ -1529,8 +1537,8 @@ complete." [{:keys [pipeline-builder bootstrap-transform - ^SocketAddress remote-address - ^SocketAddress local-address + remote-address + local-address transport name-resolver connect-timeout] @@ -1543,32 +1551,39 @@ (throw (IllegalArgumentException. "Can't use :ssl-context anymore."))) (let [^Class chan-class (transport-channel-class transport) - initializer (pipeline-initializer pipeline-builder)] - (try - (let [client-event-loop-group @(transport-client-group transport) - resolver' (when (some? name-resolver) - (cond - (= :default name-resolver) nil - (= :noop name-resolver) NoopAddressResolverGroup/INSTANCE - (instance? AddressResolverGroup name-resolver) name-resolver)) - bootstrap (doto (Bootstrap.) - (.option ChannelOption/SO_REUSEADDR true) - (.option ChannelOption/CONNECT_TIMEOUT_MILLIS (int connect-timeout)) - #_(.option ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE) ; option deprecated, removed in v5 - (.group client-event-loop-group) - (.channel chan-class) - (.handler initializer) - (.resolver resolver') - bootstrap-transform) - - fut (if local-address - (.connect bootstrap remote-address local-address) - (.connect bootstrap remote-address))] - - (d/chain' (wrap-future fut) - (fn [_] - (let [ch (.channel ^ChannelFuture fut)] - (maybe-ssl-handshake-future ch)))))))) + initializer (pipeline-initializer pipeline-builder) + client-event-loop-group @(transport-client-group transport) + resolver' (when (some? name-resolver) + (cond + (= :default name-resolver) nil + (= :noop name-resolver) NoopAddressResolverGroup/INSTANCE + (instance? AddressResolverGroup name-resolver) name-resolver)) + bootstrap (doto (Bootstrap.) + (.option ChannelOption/SO_REUSEADDR true) + (.option ChannelOption/CONNECT_TIMEOUT_MILLIS (int connect-timeout)) + #_(.option ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE) ; option deprecated, removed in v5 + (.group client-event-loop-group) + (.channel chan-class) + (.handler initializer) + (.resolver resolver') + bootstrap-transform) + + fut (connect-client bootstrap remote-address local-address)] + (doto (-> (wrap-future fut) + (d/chain' + (fn [_] + (let [ch (.channel ^ChannelFuture fut)] + (maybe-ssl-handshake-future ch))))) + (d/catch' (fn [e] + (when-not (.isDone fut) + (log/trace e "Cancelling Bootstrap#connect future") + (when-not (.cancel fut true) + (when-not (.isDone fut) + (log/warn "Transport" transport "does not support cancellation of connection attempts." + "Instead, you have to wait for the connect timeout to expire for it to be terminated." + "Its current value is" connect-timeout "ms." + "It can be set via the `connect-timeout` option.")))) + (d/error-deferred e)))))) (defn ^:no-doc ^:deprecated create-client @@ -1732,7 +1747,7 @@ (fn [shutdown-output] (when (= shutdown-output ::timeout) (log/error - (format "Timeout while waiting for requests to close (exceeded: %ss)" + (format "Timeout while waiting for connections to close (exceeded: %ss)" shutdown-timeout))))) (d/finally' ;; 3. At this stage, stop the EventLoopGroup, this will cancel any diff --git a/src/aleph/tcp.clj b/src/aleph/tcp.clj index 1d66cb4a..8192a901 100644 --- a/src/aleph/tcp.clj +++ b/src/aleph/tcp.clj @@ -155,6 +155,11 @@ "Given a host and port, returns a deferred which yields a duplex stream that can be used to communicate with the server. + Closing the stream will also close the underlying connection. + + Putting the returned deferred into an error state before it yielded the stream will cancel an + in-flight connection attempt. + Param key | Description | --- | --- | `host` | the hostname of the server. @@ -204,13 +209,16 @@ (netty/ssl-handler (.channel pipeline) ssl-context remote-address ssl-endpoint-id-alg))) (.addLast pipeline "handler" handler) (when pipeline-transform - (pipeline-transform pipeline)))] - (-> (netty/create-client-chan - {:pipeline-builder pipeline-builder - :bootstrap-transform bootstrap-transform - :remote-address remote-address - :local-address local-address - :transport (netty/determine-transport transport epoll?) - :connect-timeout connect-timeout}) - (d/catch' #(d/error! s %))) + (pipeline-transform pipeline))) + ch-d (netty/create-client-chan + {:pipeline-builder pipeline-builder + :bootstrap-transform bootstrap-transform + :remote-address remote-address + :local-address local-address + :transport (netty/determine-transport transport epoll?) + :connect-timeout connect-timeout})] + (d/catch' ch-d #(d/error! s %)) + (d/catch' s (fn [e] + (d/error! ch-d e) + (d/error-deferred e))) s)) diff --git a/test/aleph/http_test.clj b/test/aleph/http_test.clj index b6ed9f7e..8c412552 100644 --- a/test/aleph/http_test.clj +++ b/test/aleph/http_test.clj @@ -7,7 +7,7 @@ [aleph.resource-leak-detector] [aleph.ssl :as test-ssl] [aleph.tcp :as tcp] - [aleph.testutils :refer [str=]] + [aleph.testutils :refer [passive-tcp-server str=]] [clj-commons.byte-streams :as bs] [clojure.java.io :as io] [clojure.string :as str] @@ -1451,6 +1451,34 @@ (is (instance? IllegalArgumentException result)) (is (= "use-h2c? may only be true when HTTP/2 is enabled." (ex-message result)))))) +(deftest test-request-cancellation-during-connection-acquisition + (let [starved-pool (http/connection-pool + {:total-connections 0})] + (try + (let [rsp (http-get "/" {:pool starved-pool + :pool-timeout 500})] + (http/cancel-request! rsp) + (is (thrown? RequestCancellationException (deref rsp 0 :timeout)))) + (finally + (.shutdown ^Pool starved-pool))))) + +(deftest test-request-cancellation-during-connection-establishment + (let [connect-client @#'aleph.netty/connect-client + connect-future (promise)] + (with-redefs [aleph.netty/connect-client (fn [& args] + (let [fut (apply connect-client args)] + (deliver connect-future fut) + fut))] + (with-server (passive-tcp-server port) + (let [rsp (http-get "/")] + (is (some? (deref connect-future 1000 nil))) + (http/cancel-request! rsp) + (is (thrown? RequestCancellationException (deref rsp 1000 :timeout))) + (some-> @connect-future (.await 2000 TimeUnit/MILLISECONDS)) + (is (some-> @connect-future .isSuccess false?)) + (is (some-> @connect-future .isDone)) + (is (some-> @connect-future .isCancelled))))))) + (deftest test-in-flight-request-cancellation (let [conn-established (promise) conn-closed (promise)] diff --git a/test/aleph/tcp_test.clj b/test/aleph/tcp_test.clj index 11e26881..875c3470 100644 --- a/test/aleph/tcp_test.clj +++ b/test/aleph/tcp_test.clj @@ -3,9 +3,13 @@ [aleph.netty :as netty] [aleph.resource-leak-detector] [aleph.tcp :as tcp] + [aleph.testutils :refer [passive-tcp-server]] [clj-commons.byte-streams :as bs] - [clojure.test :refer [deftest testing is]] - [manifold.stream :as s])) + [clojure.test :refer [deftest is testing]] + [manifold.deferred :as d] + [manifold.stream :as s]) + (:import + (java.util.concurrent TimeUnit))) (defn echo-handler [s _] (s/connect s s)) @@ -55,4 +59,22 @@ (catch Exception _ (is (not (netty/io-uring-available?))))))) +(deftest test-cancellation-during-connection-establishment + (let [connect-client @#'aleph.netty/connect-client + connect-future (promise) + server (passive-tcp-server 0)] + (with-redefs [aleph.netty/connect-client (fn [& args] + (let [fut (apply connect-client args)] + (deliver connect-future fut) + fut))] + (with-server server + (let [c (tcp/client {:host "localhost" + :port (netty/port server)})] + (is (some? (deref connect-future 1000 nil))) + (d/timeout! c 10) + (some-> @connect-future (.await 2000 TimeUnit/MILLISECONDS)) + (is (some-> @connect-future .isSuccess false?)) + (is (some-> @connect-future .isDone)) + (is (some-> @connect-future .isCancelled))))))) + (aleph.resource-leak-detector/instrument-tests!) diff --git a/test/aleph/testutils.clj b/test/aleph/testutils.clj index f225c2c0..1a91a7f6 100644 --- a/test/aleph/testutils.clj +++ b/test/aleph/testutils.clj @@ -1,8 +1,35 @@ (ns aleph.testutils - (:import (io.netty.util AsciiString))) + (:require + [aleph.netty :as netty]) + (:import + (io.netty.util AsciiString) + (java.io Closeable) + (java.net ServerSocket Socket))) (defn str= "AsciiString-aware equals" [^CharSequence x ^CharSequence y] (AsciiString/contentEquals x y)) +(defn passive-tcp-server + "Starts a TCP server which never accepts a connection." + [port] + (let [;; A backlog of 0 would be ideal for this purpose but: "The value provided should be greater + ;; than 0. If it is less than or equal to 0, then an implementation specific default will be + ;; used." Source: + ;; https://docs.oracle.com/en%2Fjava%2Fjavase%2F21%2Fdocs%2Fapi%2F%2F/java.base/java/net/ServerSocket.html#%3Cinit%3E(int,int) + backlog 1 + server (ServerSocket. port backlog) + port (.getLocalPort server) + ;; Fill up the backlog with pending connection attempts. For some reason, the backlog length + ;; is off by one, thus the `inc`. + pending-connects (doall (repeatedly (inc backlog) #(Socket. "localhost" (int port))))] + (reify + netty/AlephServer + (port [_] port) + (wait-for-close [_] + true) + Closeable + (close [_] + (run! #(.close %) pending-connects) + (.close server)))))