diff --git a/activemq-client/src/test/java/zipkin2/reporter/activemq/ActiveMQContainer.java b/activemq-client/src/test/java/zipkin2/reporter/activemq/ActiveMQContainer.java index 1783bf77..c258a483 100644 --- a/activemq-client/src/test/java/zipkin2/reporter/activemq/ActiveMQContainer.java +++ b/activemq-client/src/test/java/zipkin2/reporter/activemq/ActiveMQContainer.java @@ -28,7 +28,7 @@ final class ActiveMQContainer extends GenericContainer { static final int ACTIVEMQ_PORT = 61616; ActiveMQContainer() { - super(parse("ghcr.io/openzipkin/zipkin-activemq:3.0.2")); + super(parse("ghcr.io/openzipkin/zipkin-activemq:3.0.6")); withExposedPorts(ACTIVEMQ_PORT); waitStrategy = Wait.forListeningPorts(ACTIVEMQ_PORT); withStartupTimeout(Duration.ofSeconds(60)); diff --git a/amqp-client/src/test/java/zipkin2/reporter/amqp/RabbitMQContainer.java b/amqp-client/src/test/java/zipkin2/reporter/amqp/RabbitMQContainer.java index ed900d6b..3e82f69e 100644 --- a/amqp-client/src/test/java/zipkin2/reporter/amqp/RabbitMQContainer.java +++ b/amqp-client/src/test/java/zipkin2/reporter/amqp/RabbitMQContainer.java @@ -29,7 +29,7 @@ final class RabbitMQContainer extends GenericContainer { static final int RABBIT_PORT = 5672; RabbitMQContainer() { - super(parse("ghcr.io/openzipkin/zipkin-rabbitmq:3.0.2")); + super(parse("ghcr.io/openzipkin/zipkin-rabbitmq:3.0.6")); withExposedPorts(RABBIT_PORT); waitStrategy = Wait.forLogMessage(".*Server startup complete.*", 1); withStartupTimeout(Duration.ofSeconds(60)); diff --git a/benchmarks/src/main/java/zipkin2/reporter/KafkaSenderBenchmarks.java b/benchmarks/src/main/java/zipkin2/reporter/KafkaSenderBenchmarks.java index b8805911..67fe6fa1 100644 --- a/benchmarks/src/main/java/zipkin2/reporter/KafkaSenderBenchmarks.java +++ b/benchmarks/src/main/java/zipkin2/reporter/KafkaSenderBenchmarks.java @@ -42,7 +42,7 @@ public class KafkaSenderBenchmarks extends SenderBenchmarks { static final class KafkaContainer extends GenericContainer { KafkaContainer() { - super(parse("ghcr.io/openzipkin/zipkin-kafka:3.0.2")); + super(parse("ghcr.io/openzipkin/zipkin-kafka:3.0.6")); waitStrategy = Wait.forHealthcheck(); // Kafka broker listener port (19092) needs to be exposed for test cases to access it. addFixedExposedPort(KAFKA_PORT, KAFKA_PORT, InternetProtocol.TCP); diff --git a/core/src/main/java/zipkin2/reporter/HttpEndpointSupplier.java b/core/src/main/java/zipkin2/reporter/HttpEndpointSupplier.java index f2a09505..05e2cb96 100644 --- a/core/src/main/java/zipkin2/reporter/HttpEndpointSupplier.java +++ b/core/src/main/java/zipkin2/reporter/HttpEndpointSupplier.java @@ -19,8 +19,20 @@ * HTTP-based {@link BytesMessageSender senders} use this to resolve a potentially-pseudo endpoint * passed by configuration to a real endpoint. * - *

Senders should consider the special value {@link #FIXED_FACTORY} and the type {@link Fixed} to - * avoid dynamic lookups when constants will be returned. + *

Usage Notes

+ * + *

Sender should implement the following logic: + *

    + *
  • During build, the sender should invoke the {@linkplain Factory}.
  • + *
  • If the result is {@link Fixed}, build the sender to use a static value.
  • + *
  • Otherwise, call {@link HttpEndpointSupplier#get()} each time + * {@linkplain BytesMessageSender#send(List)} is invoked.
  • + *
+ * + *

Implementation Notes

+ * + *

Implement friendly {@code toString()} functions, that include the real endpoint or the one + * passed to the {@linkplain Factory}. * *

Senders are not called during production requests, rather in time or size bounded loop, in a * separate async reporting thread. Implementations that resolve endpoints via remote calls, such as @@ -35,13 +47,13 @@ */ public interface HttpEndpointSupplier { /** - * HTTP {@link BytesMessageSender sender} builders check for this symbol, and will substitute its - * input as a fixed endpoint value rather than perform dynamic lookups. + * HTTP {@link BytesMessageSender sender} builders check for this symbol, and return the input as + * a {@linkplain Fixed} result rather than perform dynamic lookups. * * @since 3.3 */ Factory FIXED_FACTORY = new Factory() { - @Override public HttpEndpointSupplier create(String endpoint) { + @Override public Fixed create(String endpoint) { return new Fixed(endpoint); } }; @@ -60,8 +72,7 @@ public interface HttpEndpointSupplier { * Factory passed to HTTP {@link BytesMessageSender sender} builders to control resolution of the * static endpoint from configuration. * - *

Unless this is {@linkplain #FIXED_FACTORY}, {@linkplain #create(String)} will be deferred to - * the first call to {@linkplain BytesMessageSender#send(List)}. + *

Invoke this when building a sender, not during {@linkplain BytesMessageSender#send(List)}. * * @since 3.3 */ @@ -81,6 +92,8 @@ interface Factory { /** * HTTP {@link BytesMessageSender senders} check for this type, and will cache its first value. + * + * @since 3.3 */ final class Fixed implements HttpEndpointSupplier { private final String endpoint; diff --git a/kafka/src/test/java/zipkin2/reporter/kafka/KafkaContainer.java b/kafka/src/test/java/zipkin2/reporter/kafka/KafkaContainer.java index f8452475..0d97e878 100644 --- a/kafka/src/test/java/zipkin2/reporter/kafka/KafkaContainer.java +++ b/kafka/src/test/java/zipkin2/reporter/kafka/KafkaContainer.java @@ -37,7 +37,7 @@ final class KafkaContainer extends GenericContainer { static final int KAFKA_PORT = 19092; KafkaContainer() { - super(parse("ghcr.io/openzipkin/zipkin-kafka:3.0.2")); + super(parse("ghcr.io/openzipkin/zipkin-kafka:3.0.6")); waitStrategy = Wait.forHealthcheck(); // Kafka broker listener port (19092) needs to be exposed for test cases to access it. addFixedExposedPort(KAFKA_PORT, KAFKA_PORT, InternetProtocol.TCP); diff --git a/libthrift/src/test/java/zipkin2/reporter/libthrift/ZipkinContainer.java b/libthrift/src/test/java/zipkin2/reporter/libthrift/ZipkinContainer.java index 85cb8140..6be42cd3 100644 --- a/libthrift/src/test/java/zipkin2/reporter/libthrift/ZipkinContainer.java +++ b/libthrift/src/test/java/zipkin2/reporter/libthrift/ZipkinContainer.java @@ -31,7 +31,7 @@ final class ZipkinContainer extends GenericContainer { static final int HTTP_PORT = 9411; ZipkinContainer() { - super(parse("ghcr.io/openzipkin/zipkin:3.0.2")); + super(parse("ghcr.io/openzipkin/zipkin:3.0.6")); // zipkin-server disables scribe by default. withEnv("COLLECTOR_SCRIBE_ENABLED", "true"); withExposedPorts(SCRIBE_PORT, HTTP_PORT); diff --git a/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java b/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java index a779e16e..10075fbb 100644 --- a/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java +++ b/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java @@ -27,6 +27,7 @@ import zipkin2.reporter.CheckResult; import zipkin2.reporter.ClosedSenderException; import zipkin2.reporter.Encoding; +import zipkin2.reporter.HttpEndpointSupplier; import zipkin2.reporter.Sender; /** @@ -45,13 +46,15 @@ public static Builder newBuilder() { } public static final class Builder { - URL endpoint; + HttpEndpointSupplier.Factory endpointSupplierFactory = HttpEndpointSupplier.FIXED_FACTORY; + String endpoint; Encoding encoding = Encoding.JSON; int messageMaxBytes = 500000; int connectTimeout = 10 * 1000, readTimeout = 60 * 1000; boolean compressionEnabled = true; Builder(URLConnectionSender sender) { + this.endpointSupplierFactory = sender.endpointSupplierFactory; this.endpoint = sender.endpoint; this.encoding = sender.encoding; this.messageMaxBytes = sender.messageMaxBytes; @@ -67,17 +70,24 @@ public static final class Builder { // customizable so that users can re-map /api/v2/spans ex for browser-originated traces public Builder endpoint(String endpoint) { if (endpoint == null) throw new NullPointerException("endpoint == null"); - - try { - return endpoint(new URL(endpoint)); - } catch (MalformedURLException e) { - throw new IllegalArgumentException(e.getMessage()); - } + this.endpoint = endpoint; + return this; } public Builder endpoint(URL endpoint) { if (endpoint == null) throw new NullPointerException("endpoint == null"); - this.endpoint = endpoint; + this.endpoint = endpoint.toString(); + return this; + } + + /** + * No default. See JavaDoc on {@link HttpEndpointSupplier} for implementation notes. + */ + public Builder endpointSupplierFactory(HttpEndpointSupplier.Factory endpointSupplierFactory) { + if (endpointSupplierFactory == null) { + throw new NullPointerException("endpointSupplierFactory == null"); + } + this.endpointSupplierFactory = endpointSupplierFactory; return this; } @@ -118,14 +128,71 @@ public Builder encoding(Encoding encoding) { } public URLConnectionSender build() { - return new URLConnectionSender(this); + String endpoint = this.endpoint; + if (endpoint == null) throw new NullPointerException("endpoint == null"); + + HttpEndpointSupplier endpointSupplier = endpointSupplierFactory.create(endpoint); + if (endpointSupplier == null) throw new NullPointerException("endpointSupplier == null"); + if (endpointSupplier instanceof HttpEndpointSupplier.Fixed) { + endpoint = endpointSupplier.get(); // eagerly resolve the endpoint + return new URLConnectionSender(this, new ConstantHttpURLConnectionSupplier(endpoint)); + } + return new URLConnectionSender(this, new DynamicHttpURLConnectionSupplier(endpointSupplier)); } Builder() { } } - final URL endpoint; + private static URL toURL(String endpoint) { + try { + return new URL(endpoint); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e.getMessage()); + } + } + + interface HttpURLConnectionSupplier { + HttpURLConnection openConnection() throws IOException; + } + + static final class ConstantHttpURLConnectionSupplier implements HttpURLConnectionSupplier { + final URL endpoint; + + ConstantHttpURLConnectionSupplier(String endpoint) { + this.endpoint = toURL(endpoint); + } + + @Override public HttpURLConnection openConnection() throws IOException { + return (HttpURLConnection) endpoint.openConnection(); + } + + @Override public String toString() { + return endpoint.toString(); + } + } + + static final class DynamicHttpURLConnectionSupplier implements HttpURLConnectionSupplier { + final HttpEndpointSupplier endpointSupplier; + + DynamicHttpURLConnectionSupplier(HttpEndpointSupplier endpointSupplier) { + this.endpointSupplier = endpointSupplier; + } + + @Override public HttpURLConnection openConnection() throws IOException { + URL endpoint = toURL(endpointSupplier.get()); + return (HttpURLConnection) endpoint.openConnection(); + } + + @Override public String toString() { + return endpointSupplier.toString(); + } + } + + final HttpEndpointSupplier.Factory endpointSupplierFactory; // for toBuilder() + final String endpoint; // for toBuilder() + + final HttpURLConnectionSupplier connectionSupplier; final Encoding encoding; final String mediaType; final BytesMessageEncoder encoder; @@ -133,9 +200,10 @@ public URLConnectionSender build() { final int connectTimeout, readTimeout; final boolean compressionEnabled; - URLConnectionSender(Builder builder) { - if (builder.endpoint == null) throw new NullPointerException("endpoint == null"); + URLConnectionSender(Builder builder, HttpURLConnectionSupplier connectionSupplier) { + this.endpointSupplierFactory = builder.endpointSupplierFactory; this.endpoint = builder.endpoint; + this.connectionSupplier = connectionSupplier; this.encoding = builder.encoding; switch (builder.encoding) { case JSON: @@ -207,7 +275,7 @@ public Builder toBuilder() { void send(byte[] body, String mediaType) throws IOException { // intentionally not closing the connection, to use keep-alives - HttpURLConnection connection = (HttpURLConnection) endpoint.openConnection(); + HttpURLConnection connection = connectionSupplier.openConnection(); connection.setConnectTimeout(connectTimeout); connection.setReadTimeout(readTimeout); connection.setRequestMethod("POST"); @@ -262,7 +330,7 @@ static IOException skipAndSuppress(InputStream in) { } @Override public String toString() { - return "URLConnectionSender{" + endpoint + "}"; + return "URLConnectionSender{" + connectionSupplier + "}"; } class HttpPostCall extends Call.Base {