Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mount storage service on zipkin-server #58

Merged
merged 10 commits into from
Oct 30, 2019
6 changes: 5 additions & 1 deletion module/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
<groupId>${project.groupId}</groupId>
<artifactId>zipkin-storage-kafka</artifactId>
</dependency>
<dependency>
<groupId>${armeria.groupId}</groupId>
<artifactId>armeria-spring-boot-autoconfigure</artifactId>
jeqo marked this conversation as resolved.
Show resolved Hide resolved
<version>${armeria.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
Expand All @@ -50,7 +55,6 @@
<dependency>
<groupId>${armeria.groupId}</groupId>
<artifactId>armeria</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,41 @@
*/
package zipkin2.module.storage.kafka;

import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.spring.ArmeriaServerConfigurator;
import java.util.List;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import zipkin2.storage.StorageComponent;
import zipkin2.storage.kafka.KafkaStorage;

@Configuration
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(ZipkinKafkaStorageProperties.class)
@ConditionalOnProperty(name = "zipkin.storage.type", havingValue = "kafka")
@ConditionalOnMissingBean(StorageComponent.class)
class ZipkinKafkaStorageModule {
static final String QUALIFIER = "zipkinKafkaStorage";

@Bean
@ConditionalOnMissingBean
StorageComponent storage(ZipkinKafkaStorageProperties properties) {
return properties.toBuilder().build();
@ConditionalOnMissingBean @Qualifier(QUALIFIER) @Bean StorageComponent storage(
@Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled,
@Value("${zipkin.storage.autocomplete-keys:}") List<String> autocompleteKeys,
ZipkinKafkaStorageProperties properties) {
return properties.toBuilder()
.searchEnabled(searchEnabled)
.autocompleteKeys(autocompleteKeys)
.build();
}

@Qualifier(QUALIFIER) @Bean public ArmeriaServerConfigurator storageHttpService(
jeqo marked this conversation as resolved.
Show resolved Hide resolved
@Qualifier(QUALIFIER) StorageComponent storage) {
return sb -> sb.annotatedService("/storage/kafka", ((KafkaStorage) storage).httpService());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ KafkaStorageBuilder toBuilder() {
if (dependencyStoreStreamAppId != null) {
builder.dependencyStoreStreamAppId(dependencyStoreStreamAppId);
}

return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
/**
* Span store backed by Kafka Stream distributed state stores built by {@link
* TraceStoreTopologySupplier} and {@link DependencyStoreTopologySupplier}, and made accessible by
* {@link KafkaStoreHttpService}.
* {@link KafkaStorageHttpService}.
*/
final class KafkaSpanStore implements SpanStore, Traces, ServiceAndSpanNames {
static final ObjectMapper MAPPER = new ObjectMapper();
Expand Down
51 changes: 28 additions & 23 deletions storage/src/main/java/zipkin2/storage/kafka/KafkaStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
package zipkin2.storage.kafka;

import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.ServerBuilder;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
Expand Down Expand Up @@ -181,7 +183,7 @@ void checkResources() {
if (searchEnabled) {
getTraceStoreStream();
getDependencyStoreStream();
getServer();
//getServer();
}
}

Expand All @@ -207,9 +209,9 @@ void checkResources() {
return CheckResult.failed(
new IllegalStateException("Store stream not running. " + dependencyStateStore));
}
if (!getServer().activePort().isPresent()) {
return CheckResult.failed(new IllegalStateException("Storage HTTP server not running."));
}
//if (!getServer().activePort().isPresent()) {
// return CheckResult.failed(new IllegalStateException("Storage HTTP server not running."));
//}
}
return CheckResult.OK;
} catch (Exception e) {
Expand Down Expand Up @@ -323,27 +325,30 @@ KafkaStreams getAggregationStream() {
return traceAggregationStream;
}

@SuppressWarnings("FutureReturnValueIgnored")
Server getServer() {
if (server == null) {
synchronized (this) {
if (server == null) {
try {
server = Server.builder()
.http(httpPort)
.annotatedService(new KafkaStoreHttpService(this))
.build();
server.start();
} catch (Exception e) {
LOG.error("Error starting http server", e);
server = null;
}
}
}
}
return server;
public KafkaStorageHttpService httpService() {
return new KafkaStorageHttpService(this);
}

//@SuppressWarnings("FutureReturnValueIgnored")
//Server getServer() {
// if (server == null) {
// synchronized (this) {
// if (server == null) {
// try {
// ServerBuilder builder = Server.builder().http(httpPort);
// configureHttpService().accept(builder);
// server = builder.build();
// server.start();
// } catch (Exception e) {
// LOG.error("Error starting http server", e);
// server = null;
// }
// }
// }
// }
// return server;
//}

@Override public String toString() {
return "KafkaStorage{" +
"httpPort=" + httpPort +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ public final class KafkaStorageBuilder extends StorageComponent.Builder {

long minTracesStored = 10_000;
String hostname = "localhost";
int httpPort = 9412;
int httpPort = 9411;
BiFunction<String, Integer, String> httpBaseUrl =
(hostname, port) -> "http://" + hostname + ":" + port;
(hostname, port) -> "http://" + hostname + ":" + port + "/storage/kafka";

String storageDir = "/tmp/zipkin-storage-kafka";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.annotation.Default;
import com.linecorp.armeria.server.annotation.Get;
import com.linecorp.armeria.server.annotation.Param;
Expand All @@ -31,7 +30,6 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.KeyValueIterator;
Expand Down Expand Up @@ -65,22 +63,18 @@
* distributed state. This component exposes access to local state via Http call from {@link
* KafkaSpanStore}
*/
final class KafkaStoreHttpService implements Consumer<ServerBuilder> {
final class KafkaStorageHttpService {
static final Logger LOG = LogManager.getLogger();
static final ObjectMapper MAPPER = new ObjectMapper();

final KafkaStorage storage;
final long minTracesStored;

KafkaStoreHttpService(KafkaStorage storage) {
KafkaStorageHttpService(KafkaStorage storage) {
this.storage = storage;
this.minTracesStored = storage.minTracesStored;
}

@Override public void accept(ServerBuilder serverBuilder) {
serverBuilder.annotatedService("/zipkin/storage/kafka", this);
}

@Get("/dependencies")
public AggregatedHttpResponse getDependencies(
@Param("endTs") long endTs,
Expand Down