Skip to content

Commit

Permalink
First draft pass at the implementation of CONNECT for HTTP2.
Browse files Browse the repository at this point in the history
Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Apr 30, 2019
1 parent 66b34f8 commit 7743708
Show file tree
Hide file tree
Showing 28 changed files with 1,103 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ private Origin createOrigin(String scheme, String host, int port)
return new Origin(scheme, host, port);
}

private HttpDestination resolveDestination(HttpDestination.Key key)
HttpDestination resolveDestination(HttpDestination.Key key)
{
HttpDestination destination = destinations.get(key);
if (destination == null)
Expand Down
160 changes: 103 additions & 57 deletions jetty-client/src/main/java/org/eclipse/jetty/client/HttpProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@

import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
Expand All @@ -37,7 +39,6 @@
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;

public class HttpProxy extends ProxyConfiguration.Proxy
{
Expand All @@ -50,7 +51,12 @@ public HttpProxy(String host, int port)

public HttpProxy(Origin.Address address, boolean secure)
{
super(address, secure);
this(address, secure, new HttpDestination.Protocol(List.of("http/1.1"), false));
}

public HttpProxy(Origin.Address address, boolean secure, HttpDestination.Protocol protocol)
{
super(address, secure, Objects.requireNonNull(protocol));
}

@Override
Expand All @@ -61,9 +67,14 @@ public ClientConnectionFactory newClientConnectionFactory(ClientConnectionFactor

@Override
public URI getURI()
{
return URI.create(newOrigin().asString());
}

private Origin newOrigin()
{
String scheme = isSecure() ? HttpScheme.HTTPS.asString() : HttpScheme.HTTP.asString();
return URI.create(new Origin(scheme, getAddress()).asString());
return new Origin(scheme, getAddress());
}

private class HttpProxyClientConnectionFactory implements ClientConnectionFactory
Expand All @@ -79,40 +90,60 @@ private HttpProxyClientConnectionFactory(ClientConnectionFactory connectionFacto
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
SslContextFactory sslContextFactory = destination.getHttpClient().getSslContextFactory();
if (destination.isSecure())
HttpDestination.Protocol serverProtocol = destination.getKey().getProtocol();
boolean sameProtocol = proxySpeaksServerProtocol(serverProtocol);
if (destination.isSecure() || !sameProtocol)
{
if (sslContextFactory != null)
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
Promise<Connection> wrapped = promise;
if (promise instanceof Promise.Wrapper)
wrapped = ((Promise.Wrapper<Connection>)promise).unwrap();
if (wrapped instanceof TunnelPromise)
{
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
Promise<Connection> wrapped = promise;
if (promise instanceof Promise.Wrapper)
wrapped = ((Promise.Wrapper<Connection>)promise).unwrap();
if (wrapped instanceof TunnelPromise)
{
((TunnelPromise)wrapped).setEndPoint(endPoint);
return connectionFactory.newConnection(endPoint, context);
}
else
{
// Replace the promise with the proxy promise that creates the tunnel to the server.
CreateTunnelPromise tunnelPromise = new CreateTunnelPromise(connectionFactory, endPoint, promise, context);
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, tunnelPromise);
return connectionFactory.newConnection(endPoint, context);
}
// TODO: this is very so weird that it deserves a comment.
// I don't even know when this happens.
((TunnelPromise)wrapped).setEndPoint(endPoint);
return connectionFactory.newConnection(endPoint, context);
}
else
{
throw new IOException("Cannot tunnel request, missing " +
SslContextFactory.class.getName() + " in " + HttpClient.class.getName());
return newProxyConnection(endPoint, context);
}
}
else
{
return connectionFactory.newConnection(endPoint, context);
}
}

private boolean proxySpeaksServerProtocol(HttpDestination.Protocol serverProtocol)
{
return serverProtocol != null && getProtocol().getProtocols().stream().anyMatch(p -> serverProtocol.getProtocols().stream().anyMatch(p::equalsIgnoreCase));
}

private org.eclipse.jetty.io.Connection newProxyConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
// Replace the promise with the proxy promise that creates the tunnel to the server.
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
CreateTunnelPromise tunnelPromise = new CreateTunnelPromise(connectionFactory, endPoint, promise, context);
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, tunnelPromise);

// Replace the destination with the proxy destination.
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
HttpClient client = destination.getHttpClient();
HttpDestination proxyDestination = client.resolveDestination(new HttpDestination.Key(newOrigin(), getProtocol()));
context.put(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY, proxyDestination);
try
{
return connectionFactory.newConnection(endPoint, context);
}
finally
{
context.put(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY, destination);
}
}
}

/**
Expand All @@ -139,6 +170,8 @@ private CreateTunnelPromise(ClientConnectionFactory connectionFactory, EndPoint
@Override
public void succeeded(Connection connection)
{
// Replace the promise back with the original.
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, promise);
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
tunnel(destination, connection);
}
Expand All @@ -162,50 +195,28 @@ private void tunnel(HttpDestination destination, Connection connection)
.idleTimeout(2 * connectTimeout, TimeUnit.MILLISECONDS)
.timeout(connectTimeout, TimeUnit.MILLISECONDS);
ProxyConfiguration.Proxy proxy = destination.getProxy();
if (proxy != null && proxy.isSecure())
if (proxy.isSecure())
connect.scheme(HttpScheme.HTTPS.asString());

final HttpConversation conversation = ((HttpRequest)connect).getConversation();
HttpConversation conversation = ((HttpRequest)connect).getConversation();
conversation.setAttribute(EndPoint.class.getName(), endPoint);

connect.attribute(Connection.class.getName(), new ProxyConnection(destination, connection, promise));

connection.send(connect, result ->
{
// The EndPoint may have changed during the conversation, get the latest.
EndPoint endPoint = (EndPoint)conversation.getAttribute(EndPoint.class.getName());
if (result.isSucceeded())
{
Response response = result.getResponse();
if (response.getStatus() == HttpStatus.OK_200)
{
tunnelSucceeded(endPoint);
}
else
{
HttpResponseException failure = new HttpResponseException("Unexpected " + response +
" for " + result.getRequest(), response);
tunnelFailed(endPoint, failure);
}
}
else
{
tunnelFailed(endPoint, result.getFailure());
}
});
connection.send(connect, new TunnelListener(conversation));
}

private void tunnelSucceeded(EndPoint endPoint)
{
try
{
// Replace the promise back with the original
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, promise);
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
HttpClient client = destination.getHttpClient();
ClientConnectionFactory sslConnectionFactory = client.newSslClientConnectionFactory(connectionFactory);
HttpConnectionOverHTTP oldConnection = (HttpConnectionOverHTTP)endPoint.getConnection();
org.eclipse.jetty.io.Connection newConnection = sslConnectionFactory.newConnection(endPoint, context);
ClientConnectionFactory connectionFactory = this.connectionFactory;
if (destination.isSecure())
connectionFactory = destination.newSslClientConnectionFactory(connectionFactory);
var oldConnection = endPoint.getConnection();
var newConnection = connectionFactory.newConnection(endPoint, context);
// TODO: the comment below is outdated: we only create the connection and not link it to the endPoint.
// Creating the connection will link the new Connection the EndPoint,
// but we need the old Connection linked for the upgrade to do its job.
endPoint.setConnection(oldConnection);
Expand All @@ -224,6 +235,41 @@ private void tunnelFailed(EndPoint endPoint, Throwable failure)
endPoint.close();
promise.failed(failure);
}

private class TunnelListener extends Response.Listener.Adapter
{
private final HttpConversation conversation;

private TunnelListener(HttpConversation conversation)
{
this.conversation = conversation;
}

@Override
public void onHeaders(Response response)
{
// The EndPoint may have changed during the conversation, get the latest.
EndPoint endPoint = (EndPoint)conversation.getAttribute(EndPoint.class.getName());
if (response.getStatus() == HttpStatus.OK_200)
{
tunnelSucceeded(endPoint);
}
else
{
HttpResponseException failure = new HttpResponseException("Unexpected " + response +
" for " + response.getRequest(), response);
tunnelFailed(endPoint, failure);
}
}

@Override
public void onComplete(Result result)
{
// TODO: do we need this? For timeouts, I/O failures, etc?
if (!result.isSucceeded())
tunnelFailed(endPoint, result.getFailure());
}
}
}

private class ProxyConnection implements Connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class HttpRequest implements Request
private String query;
private String method = HttpMethod.GET.asString();
private HttpVersion version = HttpVersion.HTTP_1_1;
private boolean versionExplicit;
private long idleTimeout = -1;
private long timeout;
private long timeoutAt;
Expand Down Expand Up @@ -215,10 +216,16 @@ public HttpVersion getVersion()
return version;
}

public boolean isVersionExplicit()
{
return versionExplicit;
}

@Override
public Request version(HttpVersion version)
{
this.version = Objects.requireNonNull(version);
this.versionExplicit = true;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ public static abstract class Proxy
private final Set<String> excluded = new HashSet<>();
private final Origin.Address address;
private final boolean secure;
private final HttpDestination.Protocol protocol;

protected Proxy(Origin.Address address, boolean secure)
protected Proxy(Origin.Address address, boolean secure, HttpDestination.Protocol protocol)
{
this.address = address;
this.secure = secure;
this.protocol = protocol;
}

/**
Expand All @@ -87,6 +89,11 @@ public boolean isSecure()
return secure;
}

public HttpDestination.Protocol getProtocol()
{
return protocol;
}

/**
* @return the list of origins that must be proxied
* @see #matches(Origin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public Socks4Proxy(String host, int port)

public Socks4Proxy(Origin.Address address, boolean secure)
{
super(address, secure);
super(address, secure, null);
}

@Override
Expand All @@ -64,7 +64,7 @@ public Socks4ProxyClientConnectionFactory(ClientConnectionFactory connectionFact
}

@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context)
{
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
Executor executor = destination.getHttpClient().getExecutor();
Expand Down Expand Up @@ -193,10 +193,9 @@ private void tunnel()
try
{
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
HttpClient client = destination.getHttpClient();
ClientConnectionFactory connectionFactory = this.connectionFactory;
if (destination.isSecure())
connectionFactory = client.newSslClientConnectionFactory(connectionFactory);
connectionFactory = destination.newSslClientConnectionFactory(connectionFactory);
org.eclipse.jetty.io.Connection newConnection = connectionFactory.newConnection(getEndPoint(), context);
getEndPoint().upgrade(newConnection);
if (LOG.isDebugEnabled())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -102,11 +103,12 @@ public HttpClientTransportDynamic(ClientConnector connector, ClientConnectionFac
super(connector);
addBean(connector);
if (factoryInfos.length == 0)
throw new IllegalArgumentException("Missing ClientConnectionFactory");
factoryInfos = new Info[]{HttpClientConnectionFactory.HTTP11};
this.factoryInfos = Arrays.asList(factoryInfos);
this.protocols = Arrays.stream(factoryInfos)
.flatMap(info -> info.getProtocols().stream())
.distinct()
.map(p -> p.toLowerCase(Locale.ENGLISH))
.collect(Collectors.toList());
for (ClientConnectionFactory.Info factoryInfo : factoryInfos)
addBean(factoryInfo);
Expand All @@ -120,22 +122,29 @@ public HttpDestination.Key newDestinationKey(HttpRequest request, Origin origin)
boolean ssl = HttpScheme.HTTPS.is(request.getScheme());
String http2 = ssl ? "h2" : "h2c";
List<String> protocols = List.of();
if (request.getVersion() == HttpVersion.HTTP_2)
if (request.isVersionExplicit())
{
// The application is explicitly asking for HTTP/2, so exclude HTTP/1.1.
if (this.protocols.contains(http2))
protocols = List.of(http2);
HttpVersion version = request.getVersion();
String desired = version == HttpVersion.HTTP_2 ? http2 : "http/1.1";
if (this.protocols.contains(desired))
protocols = List.of(desired);
}
else
{
// TODO: I don't think this is right.
// The case [http/1.1, h2c] is troublesome because we cannot
// make a single Destination for both - we would have problems
// with the ConnectionPool. We really need to pick one here,
// say the first that we can speak - this is what we do below
// in newConnection() anyway.
// Preserve the order of protocols chosen by the application.
protocols = this.protocols.stream()
.filter(p -> p.equals("http/1.1") || p.equals(http2))
.collect(Collectors.toList());
}
if (protocols.isEmpty())
return new HttpDestination.Key(origin, null);
return new HttpDestination.Key(origin, new HttpDestination.Protocol(protocols, ssl));
return new HttpDestination.Key(origin, new HttpDestination.Protocol(protocols, ssl && protocols.contains(http2)));
}

@Override
Expand Down
Loading

0 comments on commit 7743708

Please sign in to comment.