Skip to content

Commit

Permalink
urlconnectionsender, but no tests
Browse files Browse the repository at this point in the history
Signed-off-by: Adrian Cole <adrian@tetrate.io>
  • Loading branch information
Adrian Cole committed Feb 11, 2024
1 parent dd2e022 commit 2bfe714
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ final class ActiveMQContainer extends GenericContainer<ActiveMQContainer> {
static final int ACTIVEMQ_PORT = 61616;

ActiveMQContainer() {
super(parse("ghcr.io/openzipkin/zipkin-activemq:3.0.2"));
super(parse("ghcr.io/openzipkin/zipkin-activemq:3.0.6"));
withExposedPorts(ACTIVEMQ_PORT);
waitStrategy = Wait.forListeningPorts(ACTIVEMQ_PORT);
withStartupTimeout(Duration.ofSeconds(60));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ final class RabbitMQContainer extends GenericContainer<RabbitMQContainer> {
static final int RABBIT_PORT = 5672;

RabbitMQContainer() {
super(parse("ghcr.io/openzipkin/zipkin-rabbitmq:3.0.2"));
super(parse("ghcr.io/openzipkin/zipkin-rabbitmq:3.0.6"));
withExposedPorts(RABBIT_PORT);
waitStrategy = Wait.forLogMessage(".*Server startup complete.*", 1);
withStartupTimeout(Duration.ofSeconds(60));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class KafkaSenderBenchmarks extends SenderBenchmarks {

static final class KafkaContainer extends GenericContainer<KafkaContainer> {
KafkaContainer() {
super(parse("ghcr.io/openzipkin/zipkin-kafka:3.0.2"));
super(parse("ghcr.io/openzipkin/zipkin-kafka:3.0.6"));
waitStrategy = Wait.forHealthcheck();
// Kafka broker listener port (19092) needs to be exposed for test cases to access it.
addFixedExposedPort(KAFKA_PORT, KAFKA_PORT, InternetProtocol.TCP);
Expand Down
27 changes: 20 additions & 7 deletions core/src/main/java/zipkin2/reporter/HttpEndpointSupplier.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,20 @@
* HTTP-based {@link BytesMessageSender senders} use this to resolve a potentially-pseudo endpoint
* passed by configuration to a real endpoint.
*
* <p>Senders should consider the special value {@link #FIXED_FACTORY} and the type {@link Fixed} to
* avoid dynamic lookups when constants will be returned.
* <h3>Usage Notes</h3>
*
* <p>Sender should implement the following logic:
* <ul>
* <li>During build, the sender should invoke the {@linkplain Factory}.</li>
* <li>If the result is {@link Fixed}, build the sender to use a static value.</li>
* <li>Otherwise, call {@link HttpEndpointSupplier#get()} each time
* {@linkplain BytesMessageSender#send(List)} is invoked.</li>
* </ul>
*
* <h3>Implementation Notes</h3>
*
* <p>Implement friendly {@code toString()} functions, that include the real endpoint or the one
* passed to the {@linkplain Factory}.
*
* <p>Senders are not called during production requests, rather in time or size bounded loop, in a
* separate async reporting thread. Implementations that resolve endpoints via remote calls, such as
Expand All @@ -35,13 +47,13 @@
*/
public interface HttpEndpointSupplier {
/**
* HTTP {@link BytesMessageSender sender} builders check for this symbol, and will substitute its
* input as a fixed endpoint value rather than perform dynamic lookups.
* HTTP {@link BytesMessageSender sender} builders check for this symbol, and return the input as
* a {@linkplain Fixed} result rather than perform dynamic lookups.
*
* @since 3.3
*/
Factory FIXED_FACTORY = new Factory() {
@Override public HttpEndpointSupplier create(String endpoint) {
@Override public Fixed create(String endpoint) {
return new Fixed(endpoint);
}
};
Expand All @@ -60,8 +72,7 @@ public interface HttpEndpointSupplier {
* Factory passed to HTTP {@link BytesMessageSender sender} builders to control resolution of the
* static endpoint from configuration.
*
* <p>Unless this is {@linkplain #FIXED_FACTORY}, {@linkplain #create(String)} will be deferred to
* the first call to {@linkplain BytesMessageSender#send(List)}.
* <p>Invoke this when building a sender, not during {@linkplain BytesMessageSender#send(List)}.
*
* @since 3.3
*/
Expand All @@ -81,6 +92,8 @@ interface Factory {

/**
* HTTP {@link BytesMessageSender senders} check for this type, and will cache its first value.
*
* @since 3.3
*/
final class Fixed implements HttpEndpointSupplier {
private final String endpoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ final class KafkaContainer extends GenericContainer<KafkaContainer> {
static final int KAFKA_PORT = 19092;

KafkaContainer() {
super(parse("ghcr.io/openzipkin/zipkin-kafka:3.0.2"));
super(parse("ghcr.io/openzipkin/zipkin-kafka:3.0.6"));
waitStrategy = Wait.forHealthcheck();
// Kafka broker listener port (19092) needs to be exposed for test cases to access it.
addFixedExposedPort(KAFKA_PORT, KAFKA_PORT, InternetProtocol.TCP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ final class ZipkinContainer extends GenericContainer<ZipkinContainer> {
static final int HTTP_PORT = 9411;

ZipkinContainer() {
super(parse("ghcr.io/openzipkin/zipkin:3.0.2"));
super(parse("ghcr.io/openzipkin/zipkin:3.0.6"));
// zipkin-server disables scribe by default.
withEnv("COLLECTOR_SCRIBE_ENABLED", "true");
withExposedPorts(SCRIBE_PORT, HTTP_PORT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import zipkin2.reporter.CheckResult;
import zipkin2.reporter.ClosedSenderException;
import zipkin2.reporter.Encoding;
import zipkin2.reporter.HttpEndpointSupplier;
import zipkin2.reporter.Sender;

/**
Expand All @@ -45,13 +46,15 @@ public static Builder newBuilder() {
}

public static final class Builder {
URL endpoint;
HttpEndpointSupplier.Factory endpointSupplierFactory = HttpEndpointSupplier.FIXED_FACTORY;
String endpoint;
Encoding encoding = Encoding.JSON;
int messageMaxBytes = 500000;
int connectTimeout = 10 * 1000, readTimeout = 60 * 1000;
boolean compressionEnabled = true;

Builder(URLConnectionSender sender) {
this.endpointSupplierFactory = sender.endpointSupplierFactory;
this.endpoint = sender.endpoint;
this.encoding = sender.encoding;
this.messageMaxBytes = sender.messageMaxBytes;
Expand All @@ -67,17 +70,24 @@ public static final class Builder {
// customizable so that users can re-map /api/v2/spans ex for browser-originated traces
public Builder endpoint(String endpoint) {
if (endpoint == null) throw new NullPointerException("endpoint == null");

try {
return endpoint(new URL(endpoint));
} catch (MalformedURLException e) {
throw new IllegalArgumentException(e.getMessage());
}
this.endpoint = endpoint;
return this;
}

public Builder endpoint(URL endpoint) {
if (endpoint == null) throw new NullPointerException("endpoint == null");
this.endpoint = endpoint;
this.endpoint = endpoint.toString();
return this;
}

/**
* No default. See JavaDoc on {@link HttpEndpointSupplier} for implementation notes.
*/
public Builder endpointSupplierFactory(HttpEndpointSupplier.Factory endpointSupplierFactory) {
if (endpointSupplierFactory == null) {
throw new NullPointerException("endpointSupplierFactory == null");
}
this.endpointSupplierFactory = endpointSupplierFactory;
return this;
}

Expand Down Expand Up @@ -118,24 +128,82 @@ public Builder encoding(Encoding encoding) {
}

public URLConnectionSender build() {
return new URLConnectionSender(this);
String endpoint = this.endpoint;
if (endpoint == null) throw new NullPointerException("endpoint == null");

HttpEndpointSupplier endpointSupplier = endpointSupplierFactory.create(endpoint);
if (endpointSupplier == null) throw new NullPointerException("endpointSupplier == null");
if (endpointSupplier instanceof HttpEndpointSupplier.Fixed) {
endpoint = endpointSupplier.get(); // eagerly resolve the endpoint
return new URLConnectionSender(this, new ConstantHttpURLConnectionSupplier(endpoint));
}
return new URLConnectionSender(this, new DynamicHttpURLConnectionSupplier(endpointSupplier));
}

Builder() {
}
}

final URL endpoint;
private static URL toURL(String endpoint) {
try {
return new URL(endpoint);
} catch (MalformedURLException e) {
throw new IllegalArgumentException(e.getMessage());
}
}

interface HttpURLConnectionSupplier {
HttpURLConnection openConnection() throws IOException;
}

static final class ConstantHttpURLConnectionSupplier implements HttpURLConnectionSupplier {
final URL endpoint;

ConstantHttpURLConnectionSupplier(String endpoint) {
this.endpoint = toURL(endpoint);
}

@Override public HttpURLConnection openConnection() throws IOException {
return (HttpURLConnection) endpoint.openConnection();
}

@Override public String toString() {
return endpoint.toString();
}
}

static final class DynamicHttpURLConnectionSupplier implements HttpURLConnectionSupplier {
final HttpEndpointSupplier endpointSupplier;

DynamicHttpURLConnectionSupplier(HttpEndpointSupplier endpointSupplier) {
this.endpointSupplier = endpointSupplier;
}

@Override public HttpURLConnection openConnection() throws IOException {
URL endpoint = toURL(endpointSupplier.get());
return (HttpURLConnection) endpoint.openConnection();
}

@Override public String toString() {
return endpointSupplier.toString();
}
}

final HttpEndpointSupplier.Factory endpointSupplierFactory; // for toBuilder()
final String endpoint; // for toBuilder()

final HttpURLConnectionSupplier connectionSupplier;
final Encoding encoding;
final String mediaType;
final BytesMessageEncoder encoder;
final int messageMaxBytes;
final int connectTimeout, readTimeout;
final boolean compressionEnabled;

URLConnectionSender(Builder builder) {
if (builder.endpoint == null) throw new NullPointerException("endpoint == null");
URLConnectionSender(Builder builder, HttpURLConnectionSupplier connectionSupplier) {
this.endpointSupplierFactory = builder.endpointSupplierFactory;
this.endpoint = builder.endpoint;
this.connectionSupplier = connectionSupplier;
this.encoding = builder.encoding;
switch (builder.encoding) {
case JSON:
Expand Down Expand Up @@ -207,7 +275,7 @@ public Builder toBuilder() {

void send(byte[] body, String mediaType) throws IOException {
// intentionally not closing the connection, to use keep-alives
HttpURLConnection connection = (HttpURLConnection) endpoint.openConnection();
HttpURLConnection connection = connectionSupplier.openConnection();
connection.setConnectTimeout(connectTimeout);
connection.setReadTimeout(readTimeout);
connection.setRequestMethod("POST");
Expand Down Expand Up @@ -262,7 +330,7 @@ static IOException skipAndSuppress(InputStream in) {
}

@Override public String toString() {
return "URLConnectionSender{" + endpoint + "}";
return "URLConnectionSender{" + connectionSupplier + "}";
}

class HttpPostCall extends Call.Base<Void> {
Expand Down

0 comments on commit 2bfe714

Please sign in to comment.