diff --git a/docker/single-instance/docker-compose.yml b/docker/single-instance/docker-compose.yml index 6812e20c..98980677 100644 --- a/docker/single-instance/docker-compose.yml +++ b/docker/single-instance/docker-compose.yml @@ -41,7 +41,6 @@ services: hostname: zipkin # required to route call to scatter-gather endpoint properly. should not be needed after #40 is solved ports: - 9411:9411 - - 9412:9412 environment: KAFKA_BOOTSTRAP_SERVERS: kafka-zookeeper:9092 KAFKA_STORAGE_DIR: /data diff --git a/module/pom.xml b/module/pom.xml index 77d15f87..52fae6af 100644 --- a/module/pom.xml +++ b/module/pom.xml @@ -50,7 +50,11 @@ ${armeria.groupId} armeria - test + + + ${armeria.groupId} + armeria-spring-boot-autoconfigure + ${armeria.version} diff --git a/module/src/main/java/zipkin2/module/storage/kafka/ZipkinKafkaStorageModule.java b/module/src/main/java/zipkin2/module/storage/kafka/ZipkinKafkaStorageModule.java index a9ae8b5d..16bfd8af 100644 --- a/module/src/main/java/zipkin2/module/storage/kafka/ZipkinKafkaStorageModule.java +++ b/module/src/main/java/zipkin2/module/storage/kafka/ZipkinKafkaStorageModule.java @@ -13,22 +13,40 @@ */ package zipkin2.module.storage.kafka; +import com.linecorp.armeria.spring.ArmeriaServerConfigurator; +import java.util.List; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import zipkin2.storage.StorageComponent; +import zipkin2.storage.kafka.KafkaStorage; -@Configuration +import static zipkin2.storage.kafka.KafkaStorage.HTTP_PATH_PREFIX; + +@Configuration(proxyBeanMethods = false) @EnableConfigurationProperties(ZipkinKafkaStorageProperties.class) @ConditionalOnProperty(name = "zipkin.storage.type", havingValue = "kafka") @ConditionalOnMissingBean(StorageComponent.class) class ZipkinKafkaStorageModule { - @Bean - @ConditionalOnMissingBean - StorageComponent storage(ZipkinKafkaStorageProperties properties) { - return properties.toBuilder().build(); + @ConditionalOnMissingBean @Bean StorageComponent storage( + @Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled, + @Value("${zipkin.storage.autocomplete-keys:}") List autocompleteKeys, + @Value("${server.port:9411}") int port, + ZipkinKafkaStorageProperties properties) { + return properties.toBuilder() + .searchEnabled(searchEnabled) + .autocompleteKeys(autocompleteKeys) + .serverPort(port) + .build(); + } + + // TODO: to be changed when >zipkin 2.18.4 #61 + // @Bean public Consumer storageHttpService(StorageComponent storage) { + @Bean public ArmeriaServerConfigurator storageHttpService(StorageComponent storage) { + return sb -> sb.annotatedService(HTTP_PATH_PREFIX, ((KafkaStorage) storage).httpService()); } } diff --git a/module/src/main/java/zipkin2/module/storage/kafka/ZipkinKafkaStorageProperties.java b/module/src/main/java/zipkin2/module/storage/kafka/ZipkinKafkaStorageProperties.java index 1ef1d2b6..4a7c9165 100644 --- a/module/src/main/java/zipkin2/module/storage/kafka/ZipkinKafkaStorageProperties.java +++ b/module/src/main/java/zipkin2/module/storage/kafka/ZipkinKafkaStorageProperties.java @@ -106,7 +106,6 @@ KafkaStorageBuilder toBuilder() { if (dependencyStoreStreamAppId != null) { builder.dependencyStoreStreamAppId(dependencyStoreStreamAppId); } - return builder; } diff --git a/module/src/test/java/zipkin2/module/storage/kafka/Access.java b/module/src/test/java/zipkin2/module/storage/kafka/Access.java index ba463afe..7bebd793 100644 --- a/module/src/test/java/zipkin2/module/storage/kafka/Access.java +++ b/module/src/test/java/zipkin2/module/storage/kafka/Access.java @@ -19,7 +19,6 @@ /** opens package access for testing */ public final class Access { public static void registerKafka(AnnotationConfigApplicationContext context) { - context.register( - PropertyPlaceholderAutoConfiguration.class, ZipkinKafkaStorageModule.class); + context.register(PropertyPlaceholderAutoConfiguration.class, ZipkinKafkaStorageModule.class); } } diff --git a/module/src/test/java/zipkin2/storage/kafka/ZipkinKafkaStorageModuleTest.java b/module/src/test/java/zipkin2/storage/kafka/ZipkinKafkaStorageModuleTest.java index 2103bc1e..9b5ddca3 100644 --- a/module/src/test/java/zipkin2/storage/kafka/ZipkinKafkaStorageModuleTest.java +++ b/module/src/test/java/zipkin2/storage/kafka/ZipkinKafkaStorageModuleTest.java @@ -26,7 +26,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -public class ZipkinKafkaStorageModuleTest { +class ZipkinKafkaStorageModuleTest { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); @AfterEach void close() { diff --git a/pom.xml b/pom.xml index e7ee5256..3d3984d6 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ io.zipkin.zipkin2 2.18.3 com.linecorp.armeria - 0.94.0 + 0.95.0 2.2.0.RELEASE 2.12.1 diff --git a/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java b/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java index db8bbe69..bd694310 100644 --- a/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java +++ b/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java @@ -56,7 +56,7 @@ /** * Span store backed by Kafka Stream distributed state stores built by {@link * TraceStoreTopologySupplier} and {@link DependencyStoreTopologySupplier}, and made accessible by - * {@link KafkaStoreHttpService}. + * {@link KafkaStorageHttpService}. */ final class KafkaSpanStore implements SpanStore, Traces, ServiceAndSpanNames { static final ObjectMapper MAPPER = new ObjectMapper(); diff --git a/storage/src/main/java/zipkin2/storage/kafka/KafkaStorage.java b/storage/src/main/java/zipkin2/storage/kafka/KafkaStorage.java index a61bc500..66cde7e7 100644 --- a/storage/src/main/java/zipkin2/storage/kafka/KafkaStorage.java +++ b/storage/src/main/java/zipkin2/storage/kafka/KafkaStorage.java @@ -53,6 +53,8 @@ * */ public class KafkaStorage extends StorageComponent { + public static final String HTTP_PATH_PREFIX = "/storage/kafka"; + static final Logger LOG = LogManager.getLogger(); public static KafkaStorageBuilder newBuilder() { @@ -181,7 +183,6 @@ void checkResources() { if (searchEnabled) { getTraceStoreStream(); getDependencyStoreStream(); - getServer(); } } @@ -207,9 +208,6 @@ void checkResources() { return CheckResult.failed( new IllegalStateException("Store stream not running. " + dependencyStateStore)); } - if (!getServer().activePort().isPresent()) { - return CheckResult.failed(new IllegalStateException("Storage HTTP server not running.")); - } } return CheckResult.OK; } catch (Exception e) { @@ -323,31 +321,13 @@ KafkaStreams getAggregationStream() { return traceAggregationStream; } - @SuppressWarnings("FutureReturnValueIgnored") - Server getServer() { - if (server == null) { - synchronized (this) { - if (server == null) { - try { - server = Server.builder() - .http(httpPort) - .annotatedService(new KafkaStoreHttpService(this)) - .build(); - server.start(); - } catch (Exception e) { - LOG.error("Error starting http server", e); - server = null; - } - } - } - } - return server; + public KafkaStorageHttpService httpService() { + return new KafkaStorageHttpService(this); } @Override public String toString() { return "KafkaStorage{" + - "httpPort=" + httpPort + - ", spanConsumerEnabled=" + spanConsumerEnabled + + "spanConsumerEnabled=" + spanConsumerEnabled + ", searchEnabled=" + searchEnabled + ", storageDir='" + storageDir + '\'' + '}'; diff --git a/storage/src/main/java/zipkin2/storage/kafka/KafkaStorageBuilder.java b/storage/src/main/java/zipkin2/storage/kafka/KafkaStorageBuilder.java index 224c2ea4..8ee0dc60 100644 --- a/storage/src/main/java/zipkin2/storage/kafka/KafkaStorageBuilder.java +++ b/storage/src/main/java/zipkin2/storage/kafka/KafkaStorageBuilder.java @@ -27,6 +27,8 @@ import org.apache.kafka.streams.StreamsConfig; import zipkin2.storage.StorageComponent; +import static zipkin2.storage.kafka.KafkaStorage.HTTP_PATH_PREFIX; + // extracted as the type is huge public final class KafkaStorageBuilder extends StorageComponent.Builder { boolean spanConsumerEnabled = true; @@ -42,9 +44,9 @@ public final class KafkaStorageBuilder extends StorageComponent.Builder { long minTracesStored = 10_000; String hostname = "localhost"; - int httpPort = 9412; + int httpPort = 9411; BiFunction httpBaseUrl = - (hostname, port) -> "http://" + hostname + ":" + port; + (hostname, port) -> "http://" + hostname + ":" + port + HTTP_PATH_PREFIX; String storageDir = "/tmp/zipkin-storage-kafka"; @@ -139,7 +141,7 @@ public KafkaStorageBuilder hostname(String hostname) { return this; } - public KafkaStorageBuilder httpPort(int httpPort) { + public KafkaStorageBuilder serverPort(int httpPort) { this.httpPort = httpPort; traceStoreStreamConfig.put(StreamsConfig.APPLICATION_SERVER_CONFIG, hostInfo()); dependencyStoreStreamConfig.put(StreamsConfig.APPLICATION_SERVER_CONFIG, hostInfo()); diff --git a/storage/src/main/java/zipkin2/storage/kafka/KafkaStoreHttpService.java b/storage/src/main/java/zipkin2/storage/kafka/KafkaStorageHttpService.java similarity index 97% rename from storage/src/main/java/zipkin2/storage/kafka/KafkaStoreHttpService.java rename to storage/src/main/java/zipkin2/storage/kafka/KafkaStorageHttpService.java index eb5b8cc8..b7039555 100644 --- a/storage/src/main/java/zipkin2/storage/kafka/KafkaStoreHttpService.java +++ b/storage/src/main/java/zipkin2/storage/kafka/KafkaStorageHttpService.java @@ -19,7 +19,6 @@ import com.linecorp.armeria.common.AggregatedHttpResponse; import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.MediaType; -import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.server.annotation.Default; import com.linecorp.armeria.server.annotation.Get; import com.linecorp.armeria.server.annotation.Param; @@ -31,7 +30,6 @@ import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.state.KeyValueIterator; @@ -65,22 +63,18 @@ * distributed state. This component exposes access to local state via Http call from {@link * KafkaSpanStore} */ -final class KafkaStoreHttpService implements Consumer { +final class KafkaStorageHttpService { static final Logger LOG = LogManager.getLogger(); static final ObjectMapper MAPPER = new ObjectMapper(); final KafkaStorage storage; final long minTracesStored; - KafkaStoreHttpService(KafkaStorage storage) { + KafkaStorageHttpService(KafkaStorage storage) { this.storage = storage; this.minTracesStored = storage.minTracesStored; } - @Override public void accept(ServerBuilder serverBuilder) { - serverBuilder.annotatedService("/zipkin/storage/kafka", this); - } - @Get("/dependencies") public AggregatedHttpResponse getDependencies( @Param("endTs") long endTs, diff --git a/storage/src/test/java/zipkin2/storage/kafka/KafkaStorageIT.java b/storage/src/test/java/zipkin2/storage/kafka/KafkaStorageIT.java index bb50eea5..5360e072 100644 --- a/storage/src/test/java/zipkin2/storage/kafka/KafkaStorageIT.java +++ b/storage/src/test/java/zipkin2/storage/kafka/KafkaStorageIT.java @@ -13,6 +13,7 @@ */ package zipkin2.storage.kafka; +import com.linecorp.armeria.server.Server; import java.io.IOException; import java.net.ServerSocket; import java.time.Duration; @@ -61,6 +62,7 @@ class KafkaStorageIT { Duration traceTimeout; KafkaStorage storage; + Server server; Properties consumerConfig; KafkaProducer> tracesProducer; KafkaProducer dependencyProducer; @@ -77,12 +79,18 @@ class KafkaStorageIT { assertThat(kafka.isRunning()).isTrue(); traceTimeout = Duration.ofSeconds(5); + int httpPort = randomPort(); storage = (KafkaStorage) KafkaStorage.newBuilder() .bootstrapServers(kafka.getBootstrapServers()) .storageDir("target/zipkin_" + System.currentTimeMillis()) .traceTimeout(traceTimeout) - .httpPort(randomPort()) + .serverPort(httpPort) .build(); + server = Server.builder() + .http(httpPort) + .annotatedService(KafkaStorage.HTTP_PATH_PREFIX, storage.httpService()) + .build(); + server.start(); Collection newTopics = new ArrayList<>(); newTopics.add(new NewTopic(storage.aggregationSpansTopic, 1, (short) 1)); @@ -107,6 +115,7 @@ class KafkaStorageIT { tracesProducer = null; storage.close(); storage = null; + server.close(); spansSerde.close(); dependencyLinkSerde.close(); }