Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support cancellation during client connection establishment #721

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
34 changes: 20 additions & 14 deletions src/aleph/http.clj
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,23 @@
will be errors, and a new connection must be created."
[^URI uri options middleware on-closed]
(let [scheme (.getScheme uri)
ssl? (= "https" scheme)]
(-> (client/http-connection
(InetSocketAddress/createUnresolved
(.getHost uri)
(int
(or
(when (pos? (.getPort uri)) (.getPort uri))
(if ssl? 443 80))))
ssl?
(if on-closed
(assoc options :on-closed on-closed)
options))

(d/chain' middleware))))
ssl? (= "https" scheme)
conn (client/http-connection
(InetSocketAddress/createUnresolved
(.getHost uri)
(int
(or
(when (pos? (.getPort uri)) (.getPort uri))
(if ssl? 443 80))))
ssl?
(if on-closed
(assoc options :on-closed on-closed)
options))]
(doto (d/chain' conn middleware)
(d/catch' (fn [e]
(log/trace e "Terminating creation of HTTP connection")
(d/error! conn e)
(d/error-deferred e))))))
KingMob marked this conversation as resolved.
Show resolved Hide resolved

(def ^:private connection-stats-callbacks (atom #{}))

Expand Down Expand Up @@ -389,6 +392,9 @@
;; function.
(reset! dispose-conn! (fn [] (flow/dispose pool k conn)))

;; allow cancellation during connection establishment
(d/connect result (first conn))
KingMob marked this conversation as resolved.
Show resolved Hide resolved

(if (realized? result)
;; to account for race condition between setting `dispose-conn!`
;; and putting `result` into error state for cancellation
Expand Down
226 changes: 120 additions & 106 deletions src/aleph/http/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -821,112 +821,126 @@
:local-address local-address
:transport (netty/determine-transport transport epoll?)
:name-resolver name-resolver
:connect-timeout connect-timeout})]

(attach-on-close-handler ch-d on-closed)

(d/chain' ch-d
(fn setup-client
[^Channel ch]
(log/debug "Channel:" ch)

;; We know the SSL handshake must be complete because create-client wraps the
;; future with maybe-ssl-handshake-future, so we can get the negotiated
;; protocol, falling back to HTTP/1.1 by default.
(let [pipeline (.pipeline ch)
protocol (cond
ssl?
(or (-> pipeline
^SslHandler (.get ^Class SslHandler)
(.applicationProtocol))
ApplicationProtocolNames/HTTP_1_1) ; Not using ALPN, HTTP/2 isn't allowed

force-h2c?
(do
(log/info "Forcing HTTP/2 over cleartext. Be sure to do this only with servers you control.")
ApplicationProtocolNames/HTTP_2)

:else
ApplicationProtocolNames/HTTP_1_1) ; Not using SSL, HTTP/2 isn't allowed unless h2c requested
setup-opts (assoc opts
:authority authority
:ch ch
:server? false
:keep-alive? keep-alive?
:keep-alive?' keep-alive?'
:logger logger
:non-tun-proxy? non-tun-proxy?
:pipeline pipeline
:pipeline-transform pipeline-transform
:raw-stream? raw-stream?
:remote-address remote-address
:response-buffer-size response-buffer-size
:ssl-context ssl-context
:ssl? ssl?)]

(log/debug (str "Using HTTP protocol: " protocol)
{:authority authority
:ssl? ssl?
:force-h2c? force-h2c?})

;; can't use ApnHandler, because we need to coordinate with Manifold code
(let [http-req-handler
(cond (.equals ApplicationProtocolNames/HTTP_1_1 protocol)
(setup-http1-client setup-opts)

(.equals ApplicationProtocolNames/HTTP_2 protocol)
(do
(http2/setup-conn-pipeline setup-opts)
(http2-req-handler setup-opts))

:else
(do
(let [msg (str "Unknown protocol: " protocol)
e (IllegalStateException. msg)]
(log/error e msg)
(netty/close ch)
(throw e))))]

;; Both Netty and Aleph are set up, unpause the pipeline
(when (.get pipeline "pause-handler")
(log/debug "Unpausing pipeline")
(.remove pipeline "pause-handler"))

(fn http-req-fn
[req]
(log/trace "http-req-fn fired")
(log/debug "client request:" (pr-str req))

;; If :aleph/close is set in the req, closes the channel and
;; returns a deferred containing the result.
(if (or (contains? req :aleph/close)
(contains? req ::close))
(-> ch (netty/close) (netty/wrap-future))

(let [t0 (System/nanoTime)
;; I suspect the below is an error for http1
;; since the shared handler might not match.
;; Should work for HTTP2, though
raw-stream? (get req :raw-stream? raw-stream?)]

(if (or (not (.isActive ch))
(not (.isOpen ch)))

(d/error-deferred
(ex-info "Channel is inactive/closed."
{:req req
:ch ch
:open? (.isOpen ch)
:active? (.isActive ch)}))

(-> (http-req-handler req)
(d/chain' (rsp-handler
{:ch ch
:keep-alive? keep-alive? ; why not keep-alive?'
:raw-stream? raw-stream?
:req req
:response-buffer-size response-buffer-size
:t0 t0})))))))))))))
:connect-timeout connect-timeout})

_ (attach-on-close-handler ch-d on-closed)
DerGuteMoritz marked this conversation as resolved.
Show resolved Hide resolved

close-ch! (atom (fn []))
result (d/deferred)

conn (d/chain' ch-d
(fn setup-client
[^Channel ch]
(log/debug "Channel:" ch)
(reset! close-ch! (fn [] @(-> (netty/close ch) (netty/wrap-future))))
(if (realized? result)
;; Account for race condition between setting `close-ch!` and putting
;; `result` into error state for cancellation
(@close-ch!)
;; We know the SSL handshake must be complete because create-client wraps the
;; future with maybe-ssl-handshake-future, so we can get the negotiated
;; protocol, falling back to HTTP/1.1 by default.
(let [pipeline (.pipeline ch)
protocol (cond
ssl?
(or (-> pipeline
^SslHandler (.get ^Class SslHandler)
(.applicationProtocol))
ApplicationProtocolNames/HTTP_1_1) ; Not using ALPN, HTTP/2 isn't allowed

force-h2c?
(do
(log/info "Forcing HTTP/2 over cleartext. Be sure to do this only with servers you control.")
ApplicationProtocolNames/HTTP_2)

:else
ApplicationProtocolNames/HTTP_1_1) ; Not using SSL, HTTP/2 isn't allowed unless h2c requested
setup-opts (assoc opts
:authority authority
:ch ch
:server? false
:keep-alive? keep-alive?
:keep-alive?' keep-alive?'
:logger logger
:non-tun-proxy? non-tun-proxy?
:pipeline pipeline
:pipeline-transform pipeline-transform
:raw-stream? raw-stream?
:remote-address remote-address
:response-buffer-size response-buffer-size
:ssl-context ssl-context
:ssl? ssl?)]

(log/debug (str "Using HTTP protocol: " protocol)
{:authority authority
:ssl? ssl?
:force-h2c? force-h2c?})

;; can't use ApnHandler, because we need to coordinate with Manifold code
(let [http-req-handler
(cond (.equals ApplicationProtocolNames/HTTP_1_1 protocol)
(setup-http1-client setup-opts)

(.equals ApplicationProtocolNames/HTTP_2 protocol)
(do
(http2/setup-conn-pipeline setup-opts)
(http2-req-handler setup-opts))

:else
(do
(let [msg (str "Unknown protocol: " protocol)
e (IllegalStateException. msg)]
(log/error e msg)
(netty/close ch)
(throw e))))]

;; Both Netty and Aleph are set up, unpause the pipeline
(when (.get pipeline "pause-handler")
(log/debug "Unpausing pipeline")
(.remove pipeline "pause-handler"))

(fn http-req-fn
[req]
(log/trace "http-req-fn fired")
(log/debug "client request:" (pr-str req))

;; If :aleph/close is set in the req, closes the channel and
;; returns a deferred containing the result.
(if (or (contains? req :aleph/close)
(contains? req ::close))
(-> ch (netty/close) (netty/wrap-future))

(let [t0 (System/nanoTime)
;; I suspect the below is an error for http1
;; since the shared handler might not match.
;; Should work for HTTP2, though
raw-stream? (get req :raw-stream? raw-stream?)]

(if (or (not (.isActive ch))
(not (.isOpen ch)))

(d/error-deferred
(ex-info "Channel is inactive/closed."
{:req req
:ch ch
:open? (.isOpen ch)
:active? (.isActive ch)}))

(-> (http-req-handler req)
(d/chain' (rsp-handler
{:ch ch
:keep-alive? keep-alive? ; why not keep-alive?'
:raw-stream? raw-stream?
:req req
:response-buffer-size response-buffer-size
:t0 t0}))))))))))))]
(d/connect conn result)
(d/catch' result (fn [e]
(log/trace e "Closing HTTP connection channel")
(d/error! ch-d e)
(@close-ch!)
(d/error-deferred e)))
result))



Expand Down
73 changes: 44 additions & 29 deletions src/aleph/netty.clj
Original file line number Diff line number Diff line change
Expand Up @@ -1521,6 +1521,14 @@
(ssl-handler ch ssl-ctx))))
(pipeline-builder p))))

(defn- connect-client
^ChannelFuture [^Bootstrap bootstrap
^SocketAddress remote-address
^SocketAddress local-address]
(if local-address
(.connect bootstrap remote-address local-address)
(.connect bootstrap remote-address)))

(defn ^:no-doc create-client-chan
"Returns a deferred containing a new Channel.

Expand All @@ -1529,8 +1537,8 @@
complete."
[{:keys [pipeline-builder
bootstrap-transform
^SocketAddress remote-address
^SocketAddress local-address
remote-address
local-address
transport
name-resolver
connect-timeout]
Expand All @@ -1543,32 +1551,39 @@
(throw (IllegalArgumentException. "Can't use :ssl-context anymore.")))

(let [^Class chan-class (transport-channel-class transport)
initializer (pipeline-initializer pipeline-builder)]
(try
(let [client-event-loop-group @(transport-client-group transport)
resolver' (when (some? name-resolver)
(cond
(= :default name-resolver) nil
(= :noop name-resolver) NoopAddressResolverGroup/INSTANCE
(instance? AddressResolverGroup name-resolver) name-resolver))
bootstrap (doto (Bootstrap.)
(.option ChannelOption/SO_REUSEADDR true)
(.option ChannelOption/CONNECT_TIMEOUT_MILLIS (int connect-timeout))
#_(.option ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE) ; option deprecated, removed in v5
(.group client-event-loop-group)
(.channel chan-class)
(.handler initializer)
(.resolver resolver')
bootstrap-transform)

fut (if local-address
(.connect bootstrap remote-address local-address)
(.connect bootstrap remote-address))]

(d/chain' (wrap-future fut)
(fn [_]
(let [ch (.channel ^ChannelFuture fut)]
(maybe-ssl-handshake-future ch))))))))
initializer (pipeline-initializer pipeline-builder)
client-event-loop-group @(transport-client-group transport)
resolver' (when (some? name-resolver)
(cond
(= :default name-resolver) nil
(= :noop name-resolver) NoopAddressResolverGroup/INSTANCE
(instance? AddressResolverGroup name-resolver) name-resolver))
bootstrap (doto (Bootstrap.)
(.option ChannelOption/SO_REUSEADDR true)
(.option ChannelOption/CONNECT_TIMEOUT_MILLIS (int connect-timeout))
#_(.option ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE) ; option deprecated, removed in v5
(.group client-event-loop-group)
(.channel chan-class)
(.handler initializer)
(.resolver resolver')
bootstrap-transform)

fut (connect-client bootstrap remote-address local-address)]
(doto (-> (wrap-future fut)
(d/chain'
(fn [_]
(let [ch (.channel ^ChannelFuture fut)]
(maybe-ssl-handshake-future ch)))))
(d/catch' (fn [e]
(when-not (.isDone fut)
(log/trace e "Cancelling Bootstrap#connect future")
(when-not (.cancel fut true)
(when-not (.isDone fut)
(log/warn "Transport" transport "does not support cancellation of connection attempts."
"Instead, you have to wait for the connect timeout to expire for it to be terminated."
"Its current value is" connect-timeout "ms."
"It can be set via the `connect-timeout` option."))))
(d/error-deferred e))))))
KingMob marked this conversation as resolved.
Show resolved Hide resolved


(defn ^:no-doc ^:deprecated create-client
Expand Down Expand Up @@ -1732,7 +1747,7 @@
(fn [shutdown-output]
(when (= shutdown-output ::timeout)
(log/error
(format "Timeout while waiting for requests to close (exceeded: %ss)"
(format "Timeout while waiting for connections to close (exceeded: %ss)"
shutdown-timeout)))))
(d/finally'
;; 3. At this stage, stop the EventLoopGroup, this will cancel any
Expand Down
Loading