Skip to content

Commit

Permalink
Issue #250 - Implement HTTP CONNECT for HTTP/2. (#3539)
Browse files Browse the repository at this point in the history
Fixes #250 - Implement HTTP CONNECT for HTTP/2.

Modified HTTP/2 implementation to support the CONNECT method.
Implemented semantic defined by RFC 8441.
Implemented section 8.3 of RFC 7540.
Introduced HTTP2Client.streamIdleTimeout.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Aug 13, 2019
1 parent 518906f commit a700907
Show file tree
Hide file tree
Showing 79 changed files with 3,084 additions and 1,006 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void connect(InetSocketAddress address, Map<String, Object> context)
context.put(ClientConnector.CLIENT_CONNECTION_FACTORY_CONTEXT_KEY, destination.getClientConnectionFactory());
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
context.put(ClientConnector.CONNECTION_PROMISE_CONTEXT_KEY, new Promise.Wrapper<>(promise));
context.put(ClientConnector.CONNECTION_PROMISE_CONTEXT_KEY, promise);
connector.connect(address, context);
}
}
26 changes: 15 additions & 11 deletions jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -509,13 +509,26 @@ private URI checkHost(URI uri)
* @param port the destination port
* @return the destination
* @see #getDestinations()
* @deprecated use {@link #resolveDestination(Request)} instead
*/
@Deprecated
public Destination getDestination(String scheme, String host, int port)
{
Origin origin = createOrigin(scheme, host, port);
return resolveDestination(new HttpDestination.Key(origin, null));
}

public Destination resolveDestination(Request request)
{
Origin origin = createOrigin(request.getScheme(), request.getHost(), request.getPort());
HttpClientTransport transport = getTransport();
HttpDestination.Key destinationKey = transport.newDestinationKey((HttpRequest)request, origin);
HttpDestination destination = resolveDestination(destinationKey);
if (LOG.isDebugEnabled())
LOG.debug("Resolved {} for {}", destination, request);
return destination;
}

private Origin createOrigin(String scheme, String host, int port)
{
if (!HttpScheme.HTTP.is(scheme) && !HttpScheme.HTTPS.is(scheme) &&
Expand All @@ -529,7 +542,7 @@ private Origin createOrigin(String scheme, String host, int port)
return new Origin(scheme, host, port);
}

private HttpDestination resolveDestination(HttpDestination.Key key)
HttpDestination resolveDestination(HttpDestination.Key key)
{
HttpDestination destination = destinations.get(key);
if (destination == null)
Expand Down Expand Up @@ -568,16 +581,7 @@ public List<Destination> getDestinations()

protected void send(final HttpRequest request, List<Response.ResponseListener> listeners)
{
Origin origin = createOrigin(request.getScheme(), request.getHost(), request.getPort());
HttpClientTransport transport = getTransport();
HttpDestination.Key destinationKey = null;
if (transport instanceof HttpClientTransport.Dynamic)
destinationKey = ((HttpClientTransport.Dynamic)transport).newDestinationKey(request, origin);
if (destinationKey == null)
destinationKey = new HttpDestination.Key(origin, null);
if (LOG.isDebugEnabled())
LOG.debug("Selected {} for {}", destinationKey, request);
HttpDestination destination = resolveDestination(destinationKey);
HttpDestination destination = (HttpDestination)resolveDestination(request);
destination.send(request, listeners);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ public interface HttpClientTransport extends ClientConnectionFactory
*/
public void setHttpClient(HttpClient client);

/**
* Creates a new Key with the given request and origin.
*
* @param request the request that triggers the creation of the Key
* @param origin the origin of the server for the request
* @return a Key that identifies a destination
*/
public HttpDestination.Key newDestinationKey(HttpRequest request, Origin origin);

/**
* Creates a new, transport-specific, {@link HttpDestination} object.
* <p>
Expand Down Expand Up @@ -78,20 +87,4 @@ public interface HttpClientTransport extends ClientConnectionFactory
* @param factory the factory for ConnectionPool instances
*/
public void setConnectionPoolFactory(ConnectionPool.Factory factory);

/**
* Specifies whether a {@link HttpClientTransport} is dynamic.
*/
@FunctionalInterface
public interface Dynamic
{
/**
* Creates a new Key with the given request and origin.
*
* @param request the request that triggers the creation of the Key
* @param origin the origin of the server for the request
* @return a Key that identifies a destination
*/
public HttpDestination.Key newDestinationKey(HttpRequest request, Origin origin);
}
}
184 changes: 113 additions & 71 deletions jetty-client/src/main/java/org/eclipse/jetty/client/HttpProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@

import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.Objects;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
Expand All @@ -37,7 +38,6 @@
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;

public class HttpProxy extends ProxyConfiguration.Proxy
{
Expand All @@ -50,7 +50,12 @@ public HttpProxy(String host, int port)

public HttpProxy(Origin.Address address, boolean secure)
{
super(address, secure);
this(address, secure, new HttpDestination.Protocol(List.of("http/1.1"), false));
}

public HttpProxy(Origin.Address address, boolean secure, HttpDestination.Protocol protocol)
{
super(address, secure, Objects.requireNonNull(protocol));
}

@Override
Expand All @@ -61,9 +66,14 @@ public ClientConnectionFactory newClientConnectionFactory(ClientConnectionFactor

@Override
public URI getURI()
{
return URI.create(newOrigin().asString());
}

private Origin newOrigin()
{
String scheme = isSecure() ? HttpScheme.HTTPS.asString() : HttpScheme.HTTP.asString();
return URI.create(new Origin(scheme, getAddress()).asString());
return new Origin(scheme, getAddress());
}

private class HttpProxyClientConnectionFactory implements ClientConnectionFactory
Expand All @@ -79,40 +89,61 @@ private HttpProxyClientConnectionFactory(ClientConnectionFactory connectionFacto
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
SslContextFactory sslContextFactory = destination.getHttpClient().getSslContextFactory();
if (destination.isSecure())
HttpDestination.Protocol serverProtocol = destination.getKey().getProtocol();
boolean sameProtocol = proxySpeaksServerProtocol(serverProtocol);
if (destination.isSecure() || !sameProtocol)
{
if (sslContextFactory != null)
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
Promise<Connection> wrapped = promise;
if (promise instanceof Promise.Wrapper)
wrapped = ((Promise.Wrapper<Connection>)promise).unwrap();
if (wrapped instanceof TunnelPromise)
{
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
Promise<Connection> wrapped = promise;
if (promise instanceof Promise.Wrapper)
wrapped = ((Promise.Wrapper<Connection>)promise).unwrap();
if (wrapped instanceof TunnelPromise)
{
((TunnelPromise)wrapped).setEndPoint(endPoint);
return connectionFactory.newConnection(endPoint, context);
}
else
{
// Replace the promise with the proxy promise that creates the tunnel to the server.
CreateTunnelPromise tunnelPromise = new CreateTunnelPromise(connectionFactory, endPoint, promise, context);
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, tunnelPromise);
return connectionFactory.newConnection(endPoint, context);
}
// In case the server closes the tunnel (e.g. proxy authentication
// required: 407 + Connection: close), we will open another tunnel
// so we need to tell the promise about the new EndPoint.
((TunnelPromise)wrapped).setEndPoint(endPoint);
return connectionFactory.newConnection(endPoint, context);
}
else
{
throw new IOException("Cannot tunnel request, missing " +
SslContextFactory.class.getName() + " in " + HttpClient.class.getName());
return newProxyConnection(endPoint, context);
}
}
else
{
return connectionFactory.newConnection(endPoint, context);
}
}

private boolean proxySpeaksServerProtocol(HttpDestination.Protocol serverProtocol)
{
return serverProtocol != null && getProtocol().getProtocols().stream().anyMatch(p -> serverProtocol.getProtocols().stream().anyMatch(p::equalsIgnoreCase));
}

private org.eclipse.jetty.io.Connection newProxyConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
// Replace the promise with the proxy promise that creates the tunnel to the server.
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
CreateTunnelPromise tunnelPromise = new CreateTunnelPromise(connectionFactory, endPoint, promise, context);
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, tunnelPromise);

// Replace the destination with the proxy destination.
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
HttpClient client = destination.getHttpClient();
HttpDestination proxyDestination = client.resolveDestination(new HttpDestination.Key(newOrigin(), getProtocol()));
context.put(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY, proxyDestination);
try
{
return connectionFactory.newConnection(endPoint, context);
}
finally
{
context.put(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY, destination);
}
}
}

/**
Expand All @@ -139,6 +170,8 @@ private CreateTunnelPromise(ClientConnectionFactory connectionFactory, EndPoint
@Override
public void succeeded(Connection connection)
{
// Replace the promise back with the original.
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, promise);
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
tunnel(destination, connection);
}
Expand All @@ -154,61 +187,28 @@ private void tunnel(HttpDestination destination, Connection connection)
String target = destination.getOrigin().getAddress().asString();
Origin.Address proxyAddress = destination.getConnectAddress();
HttpClient httpClient = destination.getHttpClient();
long connectTimeout = httpClient.getConnectTimeout();
Request connect = httpClient.newRequest(proxyAddress.getHost(), proxyAddress.getPort())
.method(HttpMethod.CONNECT)
.path(target)
.header(HttpHeader.HOST, target)
.idleTimeout(2 * connectTimeout, TimeUnit.MILLISECONDS)
.timeout(connectTimeout, TimeUnit.MILLISECONDS);
Request connect = new TunnelRequest(httpClient, proxyAddress)
.method(HttpMethod.CONNECT)
.path(target)
.header(HttpHeader.HOST, target);
ProxyConfiguration.Proxy proxy = destination.getProxy();
if (proxy != null && proxy.isSecure())
if (proxy.isSecure())
connect.scheme(HttpScheme.HTTPS.asString());

final HttpConversation conversation = ((HttpRequest)connect).getConversation();
conversation.setAttribute(EndPoint.class.getName(), endPoint);

connect.attribute(Connection.class.getName(), new ProxyConnection(destination, connection, promise));

connection.send(connect, result ->
{
// The EndPoint may have changed during the conversation, get the latest.
EndPoint endPoint = (EndPoint)conversation.getAttribute(EndPoint.class.getName());
if (result.isSucceeded())
{
Response response = result.getResponse();
if (response.getStatus() == HttpStatus.OK_200)
{
tunnelSucceeded(endPoint);
}
else
{
HttpResponseException failure = new HttpResponseException("Unexpected " + response +
" for " + result.getRequest(), response);
tunnelFailed(endPoint, failure);
}
}
else
{
tunnelFailed(endPoint, result.getFailure());
}
});
connection.send(connect, new TunnelListener(connect));
}

private void tunnelSucceeded(EndPoint endPoint)
{
try
{
// Replace the promise back with the original
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, promise);
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
HttpClient client = destination.getHttpClient();
ClientConnectionFactory sslConnectionFactory = client.newSslClientConnectionFactory(connectionFactory);
HttpConnectionOverHTTP oldConnection = (HttpConnectionOverHTTP)endPoint.getConnection();
org.eclipse.jetty.io.Connection newConnection = sslConnectionFactory.newConnection(endPoint, context);
// Creating the connection will link the new Connection the EndPoint,
// but we need the old Connection linked for the upgrade to do its job.
endPoint.setConnection(oldConnection);
ClientConnectionFactory connectionFactory = this.connectionFactory;
if (destination.isSecure())
connectionFactory = destination.newSslClientConnectionFactory(connectionFactory);
var oldConnection = endPoint.getConnection();
var newConnection = connectionFactory.newConnection(endPoint, context);
endPoint.upgrade(newConnection);
if (LOG.isDebugEnabled())
LOG.debug("HTTP tunnel established: {} over {}", oldConnection, newConnection);
Expand All @@ -224,6 +224,40 @@ private void tunnelFailed(EndPoint endPoint, Throwable failure)
endPoint.close();
promise.failed(failure);
}

private class TunnelListener extends Response.Listener.Adapter
{
private final HttpConversation conversation;

private TunnelListener(Request request)
{
this.conversation = ((HttpRequest)request).getConversation();
}

@Override
public void onHeaders(Response response)
{
// The EndPoint may have changed during the conversation, get the latest.
EndPoint endPoint = (EndPoint)conversation.getAttribute(EndPoint.class.getName());
if (response.getStatus() == HttpStatus.OK_200)
{
tunnelSucceeded(endPoint);
}
else
{
HttpResponseException failure = new HttpResponseException("Unexpected " + response +
" for " + response.getRequest(), response);
tunnelFailed(endPoint, failure);
}
}

@Override
public void onComplete(Result result)
{
if (result.isFailed())
tunnelFailed(endPoint, result.getFailure());
}
}
}

private class ProxyConnection implements Connection
Expand Down Expand Up @@ -296,4 +330,12 @@ private void setEndPoint(EndPoint endPoint)
conversation.setAttribute(EndPoint.class.getName(), endPoint);
}
}

public static class TunnelRequest extends HttpRequest
{
private TunnelRequest(HttpClient client, Origin.Address address)
{
super(client, new HttpConversation(), URI.create("http://" + address.asString()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ public Decoder(ResponseNotifier notifier, HttpResponse response, ContentDecoder
}

@Override
protected Action process() throws Throwable
protected Action process()
{
while (true)
{
Expand Down
Loading

0 comments on commit a700907

Please sign in to comment.