Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Hostname variable #49

Merged
merged 4 commits into from
Oct 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions autoconfigure/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,32 @@

| Configuration | Description | Default |
|---------------|-------------|---------|
| `KAFKA_STORAGE_HOST_NAME` | Host name used by storage instances to scatter-gather results | `localhost` |
| `KAFKA_STORAGE_DIR` | Root path where Zipkin stores tracing data | `/tmp/zipkin-storage-kafka` |
| `KAFKA_STORAGE_TRACE_TIMEOUT` | How long to wait until a trace window is closed (ms). If this config is to small, dependency links won't be caught and metrics may drift. | `600000` (1 minute) |
| `KAFKA_STORAGE_TRACE_TTL` | How long to keep traces stored. | `259200000` (3 days) |
| `KAFKA_STORAGE_DEPENDENCY_TTL` | How long to keep dependencies stored. | `604800000` (1 week) |

### When Kubernetes/Openshift

When running on Kubernetes/Openshift is recommended to use `statefulsets` in order to maintain
storage state directories.

#### Configure hostname

For instances to access other pods on the stateful set, we have to use valid DNS-names:

```yaml
env:
- name: STORAGE_TYPE
value: kafka
# Gather hostname (name-${POD_ID}) from metadata
- name: HOSTNAME
valueFrom:
fieldRef:
fieldPath: metadata.name
# Mapping hostname to Kubernetes DNS defined service name (${NAME}-${POD_ID}.${SVC}.${NAMESPACE}.svc.cluster.local),
# then instance storage becomes accessible between them
- name: KAFKA_STORAGE_HOST_NAME
value: $(HOSTNAME).zipkin.default.svc.cluster.local
```
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class ZipkinKafkaStorageProperties implements Serializable {

private Boolean spanConsumerEnabled;

private String hostname;

private String bootstrapServers;

private Long traceTtlCheckInterval;
Expand Down Expand Up @@ -57,6 +59,7 @@ public class ZipkinKafkaStorageProperties implements Serializable {
KafkaStorageBuilder toBuilder() {
KafkaStorageBuilder builder = KafkaStorage.newBuilder();
if (spanConsumerEnabled != null) builder.spanConsumerEnabled(spanConsumerEnabled);
if (hostname != null) builder.hostname(hostname);
if (bootstrapServers != null) builder.bootstrapServers(bootstrapServers);
if (traceTimeout != null) {
builder.traceTimeout(Duration.ofMillis(traceTimeout));
Expand Down Expand Up @@ -99,7 +102,11 @@ KafkaStorageBuilder toBuilder() {
return builder;
}

public void setSpanConsumerEnabled(boolean spanConsumerEnabled) {
public Boolean getSpanConsumerEnabled() {
return spanConsumerEnabled;
}

public void setSpanConsumerEnabled(Boolean spanConsumerEnabled) {
this.spanConsumerEnabled = spanConsumerEnabled;
}

Expand All @@ -111,6 +118,14 @@ public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}

public String getHostname() {
return hostname;
}

public void setHostname(String hostname) {
this.hostname = hostname;
}

public Long getTraceTtlCheckInterval() {
return traceTtlCheckInterval;
}
Expand Down Expand Up @@ -143,14 +158,6 @@ public void setSpansTopic(String spansTopic) {
this.spansTopic = spansTopic;
}

public Boolean getSpanConsumerEnabled() {
return spanConsumerEnabled;
}

public void setSpanConsumerEnabled(Boolean spanConsumerEnabled) {
this.spanConsumerEnabled = spanConsumerEnabled;
}

public String getTraceTopic() {
return traceTopic;
}
Expand Down
3 changes: 2 additions & 1 deletion autoconfigure/src/main/resources/zipkin-server-kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
zipkin:
storage:
kafka:
hostname: ${KAFKA_STORAGE_HOST_NAME:${HOSTNAME:localhost}}
# Connection to Kafka
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
# Kafka topic names
Expand All @@ -19,4 +20,4 @@ zipkin:
trace-timeout: ${KAFKA_STORAGE_TRACE_TIMEOUT:60000}
trace-ttl: ${KAFKA_STORAGE_TRACE_TTL:259200000}
trace-ttl-check-interval: ${KAFKA_STORAGE_TRACE_TTL_CHECK_INTERVAL:3600000}
dependency-ttl: ${KAFKA_STORAGE_DEPENDENCY_TTL:604800000}
dependency-ttl: ${KAFKA_STORAGE_DEPENDENCY_TTL:604800000}
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,16 @@ public class ZipkinKafkaStorageAutoConfigurationTest {
assertThat(context.getBean(KafkaStorage.class).dependencyTopicName).isEqualTo(
"zipkin-dependencies-1");
}

@Test void canOverridesProperty_hostname() {
TestPropertyValues.of(
"zipkin.storage.type:kafka",
"zipkin.storage.kafka.hostname:other_host"
).applyTo(context);
Access.registerKafka(context);
context.refresh();

assertThat(context.getBean(KafkaStorage.class).hostname).isEqualTo(
"other_host");
}
}
2 changes: 2 additions & 0 deletions storage/src/main/java/zipkin2/storage/kafka/KafkaStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public static KafkaStorageBuilder newBuilder() {
// Kafka Storage configs
final String storageDir;
final long minTracesStored;
final String hostname;
final int httpPort;
// Kafka Topics
final String spansTopicName, traceTopicName, dependencyTopicName;
Expand Down Expand Up @@ -97,6 +98,7 @@ public static KafkaStorageBuilder newBuilder() {
// Storage directories
this.storageDir = builder.storageDir;
this.minTracesStored = builder.minTracesStored;
this.hostname = builder.hostname;
this.httpPort = builder.httpPort;
this.httpBaseUrl = builder.httpBaseUrl;
// Kafka Configs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public final class KafkaStorageBuilder extends StorageComponent.Builder {
Duration dependencyWindowSize = Duration.ofMinutes(1);

long minTracesStored = 10_000;
String hostname = "localhost";
int httpPort = 9412;
BiFunction<String, Integer, String> httpBaseUrl =
(hostname, port) -> "http://" + hostname + ":" + port;
Expand Down Expand Up @@ -100,13 +101,7 @@ public final class KafkaStorageBuilder extends StorageComponent.Builder {
}

String hostInfo() {
String hostInfo = "localhost";
try {
hostInfo = InetAddress.getLocalHost().getHostName() + ":" + httpPort;
} catch (UnknownHostException e) {
e.printStackTrace();
}
return hostInfo;
return hostname + ":" + httpPort;
}

@Override public KafkaStorageBuilder strictTraceId(boolean strictTraceId) {
Expand Down Expand Up @@ -135,6 +130,14 @@ public KafkaStorageBuilder spanConsumerEnabled(boolean spanConsumerEnabled) {
return this;
}

public KafkaStorageBuilder hostname(String hostname) {
if (hostname == null) throw new NullPointerException("hostname == null");
this.hostname = hostname;
traceStoreStreamConfig.put(StreamsConfig.APPLICATION_SERVER_CONFIG, hostInfo());
dependencyStoreStreamConfig.put(StreamsConfig.APPLICATION_SERVER_CONFIG, hostInfo());
return this;
}

public KafkaStorageBuilder httpPort(int httpPort) {
this.httpPort = httpPort;
traceStoreStreamConfig.put(StreamsConfig.APPLICATION_SERVER_CONFIG, hostInfo());
Expand All @@ -146,9 +149,7 @@ public KafkaStorageBuilder httpPort(int httpPort) {
* How long to wait for a span in order to trigger a trace as completed.
*/
public KafkaStorageBuilder traceTimeout(Duration traceTimeout) {
if (traceTimeout == null) {
throw new NullPointerException("traceTimeout == null");
}
if (traceTimeout == null) throw new NullPointerException("traceTimeout == null");
this.traceTimeout = traceTimeout;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,23 @@

final class KafkaStreamsMetadata {
static KafkaStreamsMetadata create(
Collection<org.apache.kafka.streams.state.StreamsMetadata> other) {
Collection<org.apache.kafka.streams.state.StreamsMetadata> other) {
KafkaStreamsMetadata metadata = new KafkaStreamsMetadata();
metadata.metadata = other.stream().map(StreamsMetadata::create).collect(Collectors.toSet());
return metadata;
}

Set<StreamsMetadata> metadata;

Set<StreamsMetadata> getMetadata() {
KafkaStreamsMetadata() {
}

public void setMetadata(
Set<StreamsMetadata> metadata) {
this.metadata = metadata;
}

public Set<StreamsMetadata> getMetadata() {
return metadata;
}

Expand All @@ -37,15 +45,31 @@ static StreamsMetadata create(org.apache.kafka.streams.state.StreamsMetadata oth
metadata.hostInfo = HostInfo.create(other.hostInfo());
metadata.storeNames = other.stateStoreNames();
metadata.topicPartitions = other.topicPartitions().stream()
.map(TopicPartition::create)
.collect(Collectors.toSet());
.map(TopicPartition::create)
.collect(Collectors.toSet());
return metadata;
}

HostInfo hostInfo;
Set<String> storeNames;
Set<TopicPartition> topicPartitions;

StreamsMetadata() {
}

public void setHostInfo(HostInfo hostInfo) {
this.hostInfo = hostInfo;
}

public void setStoreNames(Set<String> storeNames) {
this.storeNames = storeNames;
}

public void setTopicPartitions(
Set<TopicPartition> topicPartitions) {
this.topicPartitions = topicPartitions;
}

public HostInfo getHostInfo() {
return hostInfo;
}
Expand All @@ -69,6 +93,17 @@ static HostInfo create(org.apache.kafka.streams.state.HostInfo other) {
String host;
Integer port;

HostInfo() {
}

public void setHost(String host) {
this.host = host;
}

public void setPort(Integer port) {
this.port = port;
}

public String getHost() {
return host;
}
Expand All @@ -86,9 +121,20 @@ static TopicPartition create(org.apache.kafka.common.TopicPartition other) {
return topicPartition;
}

TopicPartition() {
}

String topic;
Integer partition;

public void setTopic(String topic) {
this.topic = topic;
}

public void setPartition(Integer partition) {
this.partition = partition;
}

public String getTopic() {
return topic;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ class KafkaStorageIT {
.limit(1)
.build())
.execute();

assertThat(filteredTraces).hasSize(1);
assertThat(filteredTraces.get(0)).hasSize(1); // last trace is returned first
List<String> services = serviceAndSpanNames.getServiceNames().execute();
Expand Down