Skip to content

Commit

Permalink
Mount storage service on zipkin-server (#58)
Browse files Browse the repository at this point in the history
* initial effort to mount storage service on zipkin-server

* remove unused property

* refactor: expose http service via storage

* refactor: expose http service via storage

* enabling endpoint via configurator to be fixed by consumer<serverBuilder>

* refactor: constant http path prefix

* fix tests and back to consumer<serverBuilder>

* back to server config until next zipkin release available

* remove additional port

* add default port
  • Loading branch information
jeqo authored Oct 30, 2019
1 parent a8c0e8e commit adf3b8c
Show file tree
Hide file tree
Showing 12 changed files with 54 additions and 50 deletions.
1 change: 0 additions & 1 deletion docker/single-instance/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ services:
hostname: zipkin # required to route call to scatter-gather endpoint properly. should not be needed after #40 is solved
ports:
- 9411:9411
- 9412:9412
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka-zookeeper:9092
KAFKA_STORAGE_DIR: /data
Expand Down
6 changes: 5 additions & 1 deletion module/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@
<dependency>
<groupId>${armeria.groupId}</groupId>
<artifactId>armeria</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${armeria.groupId}</groupId>
<artifactId>armeria-spring-boot-autoconfigure</artifactId>
<version>${armeria.version}</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,40 @@
*/
package zipkin2.module.storage.kafka;

import com.linecorp.armeria.spring.ArmeriaServerConfigurator;
import java.util.List;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import zipkin2.storage.StorageComponent;
import zipkin2.storage.kafka.KafkaStorage;

@Configuration
import static zipkin2.storage.kafka.KafkaStorage.HTTP_PATH_PREFIX;

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(ZipkinKafkaStorageProperties.class)
@ConditionalOnProperty(name = "zipkin.storage.type", havingValue = "kafka")
@ConditionalOnMissingBean(StorageComponent.class)
class ZipkinKafkaStorageModule {

@Bean
@ConditionalOnMissingBean
StorageComponent storage(ZipkinKafkaStorageProperties properties) {
return properties.toBuilder().build();
@ConditionalOnMissingBean @Bean StorageComponent storage(
@Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled,
@Value("${zipkin.storage.autocomplete-keys:}") List<String> autocompleteKeys,
@Value("${server.port:9411}") int port,
ZipkinKafkaStorageProperties properties) {
return properties.toBuilder()
.searchEnabled(searchEnabled)
.autocompleteKeys(autocompleteKeys)
.serverPort(port)
.build();
}

// TODO: to be changed when >zipkin 2.18.4 #61
// @Bean public Consumer<ServerBuilder> storageHttpService(StorageComponent storage) {
@Bean public ArmeriaServerConfigurator storageHttpService(StorageComponent storage) {
return sb -> sb.annotatedService(HTTP_PATH_PREFIX, ((KafkaStorage) storage).httpService());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ KafkaStorageBuilder toBuilder() {
if (dependencyStoreStreamAppId != null) {
builder.dependencyStoreStreamAppId(dependencyStoreStreamAppId);
}

return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
/** opens package access for testing */
public final class Access {
public static void registerKafka(AnnotationConfigApplicationContext context) {
context.register(
PropertyPlaceholderAutoConfiguration.class, ZipkinKafkaStorageModule.class);
context.register(PropertyPlaceholderAutoConfiguration.class, ZipkinKafkaStorageModule.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class ZipkinKafkaStorageModuleTest {
class ZipkinKafkaStorageModuleTest {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();

@AfterEach void close() {
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<zipkin.groupId>io.zipkin.zipkin2</zipkin.groupId>
<zipkin.version>2.18.3</zipkin.version>
<armeria.groupId>com.linecorp.armeria</armeria.groupId>
<armeria.version>0.94.0</armeria.version>
<armeria.version>0.95.0</armeria.version>
<spring-boot.version>2.2.0.RELEASE</spring-boot.version>

<log4j.version>2.12.1</log4j.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
/**
* Span store backed by Kafka Stream distributed state stores built by {@link
* TraceStoreTopologySupplier} and {@link DependencyStoreTopologySupplier}, and made accessible by
* {@link KafkaStoreHttpService}.
* {@link KafkaStorageHttpService}.
*/
final class KafkaSpanStore implements SpanStore, Traces, ServiceAndSpanNames {
static final ObjectMapper MAPPER = new ObjectMapper();
Expand Down
30 changes: 5 additions & 25 deletions storage/src/main/java/zipkin2/storage/kafka/KafkaStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
* </ul>
*/
public class KafkaStorage extends StorageComponent {
public static final String HTTP_PATH_PREFIX = "/storage/kafka";

static final Logger LOG = LogManager.getLogger();

public static KafkaStorageBuilder newBuilder() {
Expand Down Expand Up @@ -181,7 +183,6 @@ void checkResources() {
if (searchEnabled) {
getTraceStoreStream();
getDependencyStoreStream();
getServer();
}
}

Expand All @@ -207,9 +208,6 @@ void checkResources() {
return CheckResult.failed(
new IllegalStateException("Store stream not running. " + dependencyStateStore));
}
if (!getServer().activePort().isPresent()) {
return CheckResult.failed(new IllegalStateException("Storage HTTP server not running."));
}
}
return CheckResult.OK;
} catch (Exception e) {
Expand Down Expand Up @@ -323,31 +321,13 @@ KafkaStreams getAggregationStream() {
return traceAggregationStream;
}

@SuppressWarnings("FutureReturnValueIgnored")
Server getServer() {
if (server == null) {
synchronized (this) {
if (server == null) {
try {
server = Server.builder()
.http(httpPort)
.annotatedService(new KafkaStoreHttpService(this))
.build();
server.start();
} catch (Exception e) {
LOG.error("Error starting http server", e);
server = null;
}
}
}
}
return server;
public KafkaStorageHttpService httpService() {
return new KafkaStorageHttpService(this);
}

@Override public String toString() {
return "KafkaStorage{" +
"httpPort=" + httpPort +
", spanConsumerEnabled=" + spanConsumerEnabled +
"spanConsumerEnabled=" + spanConsumerEnabled +
", searchEnabled=" + searchEnabled +
", storageDir='" + storageDir + '\'' +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.kafka.streams.StreamsConfig;
import zipkin2.storage.StorageComponent;

import static zipkin2.storage.kafka.KafkaStorage.HTTP_PATH_PREFIX;

// extracted as the type is huge
public final class KafkaStorageBuilder extends StorageComponent.Builder {
boolean spanConsumerEnabled = true;
Expand All @@ -42,9 +44,9 @@ public final class KafkaStorageBuilder extends StorageComponent.Builder {

long minTracesStored = 10_000;
String hostname = "localhost";
int httpPort = 9412;
int httpPort = 9411;
BiFunction<String, Integer, String> httpBaseUrl =
(hostname, port) -> "http://" + hostname + ":" + port;
(hostname, port) -> "http://" + hostname + ":" + port + HTTP_PATH_PREFIX;

String storageDir = "/tmp/zipkin-storage-kafka";

Expand Down Expand Up @@ -139,7 +141,7 @@ public KafkaStorageBuilder hostname(String hostname) {
return this;
}

public KafkaStorageBuilder httpPort(int httpPort) {
public KafkaStorageBuilder serverPort(int httpPort) {
this.httpPort = httpPort;
traceStoreStreamConfig.put(StreamsConfig.APPLICATION_SERVER_CONFIG, hostInfo());
dependencyStoreStreamConfig.put(StreamsConfig.APPLICATION_SERVER_CONFIG, hostInfo());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.annotation.Default;
import com.linecorp.armeria.server.annotation.Get;
import com.linecorp.armeria.server.annotation.Param;
Expand All @@ -31,7 +30,6 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.KeyValueIterator;
Expand Down Expand Up @@ -65,22 +63,18 @@
* distributed state. This component exposes access to local state via Http call from {@link
* KafkaSpanStore}
*/
final class KafkaStoreHttpService implements Consumer<ServerBuilder> {
final class KafkaStorageHttpService {
static final Logger LOG = LogManager.getLogger();
static final ObjectMapper MAPPER = new ObjectMapper();

final KafkaStorage storage;
final long minTracesStored;

KafkaStoreHttpService(KafkaStorage storage) {
KafkaStorageHttpService(KafkaStorage storage) {
this.storage = storage;
this.minTracesStored = storage.minTracesStored;
}

@Override public void accept(ServerBuilder serverBuilder) {
serverBuilder.annotatedService("/zipkin/storage/kafka", this);
}

@Get("/dependencies")
public AggregatedHttpResponse getDependencies(
@Param("endTs") long endTs,
Expand Down
11 changes: 10 additions & 1 deletion storage/src/test/java/zipkin2/storage/kafka/KafkaStorageIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package zipkin2.storage.kafka;

import com.linecorp.armeria.server.Server;
import java.io.IOException;
import java.net.ServerSocket;
import java.time.Duration;
Expand Down Expand Up @@ -61,6 +62,7 @@ class KafkaStorageIT {

Duration traceTimeout;
KafkaStorage storage;
Server server;
Properties consumerConfig;
KafkaProducer<String, List<Span>> tracesProducer;
KafkaProducer<String, DependencyLink> dependencyProducer;
Expand All @@ -77,12 +79,18 @@ class KafkaStorageIT {
assertThat(kafka.isRunning()).isTrue();

traceTimeout = Duration.ofSeconds(5);
int httpPort = randomPort();
storage = (KafkaStorage) KafkaStorage.newBuilder()
.bootstrapServers(kafka.getBootstrapServers())
.storageDir("target/zipkin_" + System.currentTimeMillis())
.traceTimeout(traceTimeout)
.httpPort(randomPort())
.serverPort(httpPort)
.build();
server = Server.builder()
.http(httpPort)
.annotatedService(KafkaStorage.HTTP_PATH_PREFIX, storage.httpService())
.build();
server.start();

Collection<NewTopic> newTopics = new ArrayList<>();
newTopics.add(new NewTopic(storage.aggregationSpansTopic, 1, (short) 1));
Expand All @@ -107,6 +115,7 @@ class KafkaStorageIT {
tracesProducer = null;
storage.close();
storage = null;
server.close();
spansSerde.close();
dependencyLinkSerde.close();
}
Expand Down

0 comments on commit adf3b8c

Please sign in to comment.