Skip to content

Commit

Permalink
Refactoring streams and storage (#25)
Browse files Browse the repository at this point in the history
* chore: update to kstreams 2.3

* refactor: draft aggregation and indexing without lucene

* refactor: dependency link aggregation

* refactor: compilable set of changes;

* chore: fix formatting

* fix: failing tests

* chore: deprecate and format

* chore: update docker image for it

* fix: remote service call and store names

* feat: testing suppress on trace aggregation

* feat: unit testing aggregation

* feat: testing traces retention

* feat: complete streams unit testing

* feat: it testing trace queries;

* feat: it test for dependencies

* refactor: join stream apps into 2

one for aggregation and another for store

* chore: remove old comment

* chore: remove unused plugin

* docs: add methods description

* chore: update versions

* feat: autocomplete tags support

* feat: moving to zipkin/distroless images

* feat: docker image renewed;

* fix: failing test

* chore: clean properties

* fix: execution handling and code reuse

* chore: add todo for async call

* fix: issue with dependency query

* chore: close producer

* chore: increase ti

* chore: trying to chase issue with travis test

* chore: trying to chase issue with travis test

* docs: update stream images

* fix: ensure topics are created before running tests

* chore: remove non needed print exception stack

* chore: update configs and docs;

* chore: var names and logging

* chore: var names and logging

* chore: var names and logging

* chore: var names and logging

* chore: var names and logging

* feat: aligning configs and docker

* fix: topic name in tests

* chore: update docs and configs

* chore: doc mounting custom libs

* fix: dependency-links to dependencies

Signed-off-by: Jorge Quilcate Otoya <quilcate.jorge@gmail.com>

* docs: add steps to test

* fix: issue with inactivity gap variable

* chore: simplify link mapping and clean imports

* docs: add ack to ksteram viz app

* fix: topic name

* chore: add container names

* chore: update versions

* feat: remove topics details and rename vars

* docs: more specific naming

* docs: remove too specific variables and simplify config

* fix: store dir test

* feat: add clients overrides

* fix: overrides

* chore: remove 'all' filter

* chore: rename kafka topic env variables

* docs: update env list

* feat: back to separated stores to support dependency only use-case

additionally, spans are now the source for trace store.

* docs: updating environments and doc of new approach

* chore: simplifying testing

* chore: rename vars

* chore: add suppress warning for ignored future

* feat: rename profile and storage to kafka

* docs: component javadocs

* docs: add graph details

* docs: add graph details

* chore: fix acks

* update base zipkin version

Co-Authored-By: Adrian Cole <adriancole@users.noreply.github.com>

* chore: rename from span to spans topic

* refactor: simplify config names

* docs: add topics list;

* fix: joining spans

* chore: update javadocs, config passing, and minimum traces stored.

* chore: small fixes

* chore: rename vars

* refactor: rename to flush interval

* docs: add details about inactivity gap

* refactor: more renames
  • Loading branch information
jeqo authored Aug 23, 2019
1 parent 661ec18 commit 3e9458d
Show file tree
Hide file tree
Showing 59 changed files with 2,244 additions and 2,967 deletions.
3 changes: 0 additions & 3 deletions .mvn/wrapper/MavenWrapperDownloader.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ Licensed to the Apache Software Foundation (ASF) under one
under the License.
*/

import java.net.*;
import java.io.*;
import java.nio.channels.*;
import java.util.Properties;

public class MavenWrapperDownloader {
Expand Down
155 changes: 99 additions & 56 deletions DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,95 +2,138 @@

## Goals

* Provide a fast and reliable storage that enable extensibility via Kafka Topics.
* Provide a fast and reliable storage that enable extensibility via Kafka topics.
* Provide full storage functionality via streaming aggregations (e.g., dependency graph).
* Create a processing space where additional enrichment can be plugged in into the processing
pipeline.
* Remove need for additional storage when Kafka is available.
* More focused on supporting processing than storage: traces and dependency links are emitted
downstream to support metrics aggregation. Storage is currently supported but in a single node.

### Zipkin Storage Component
## Kafka Zipkin Storage

A Zipkin Storage component has the following internal parts:
Storage is composed by 3 main components:

* `Builder`: which configures if
- `strictTraceId(boolean strictTraceId)`
- `searchEnabled(boolean searchEnabled)`
- `autocompleteKeys(List<String> keys)`
- `autocompleteTtl(int autocompleteTtl)`
- `autocompleteCardinality(int autocompleteCardinality)`
* `SpanStore`: main component
- `Call<List<List<Span>>> getTraces(QueryRequest request);`
- `Call<List<Span>> getTrace(String traceId);`
- `Call<List<String>> getServiceNames();`
- `Call<List<String>> getSpanNames(String serviceName);`
- `Call<List<DependencyLink>> getDependencies(long endTs, long lookback);`
* `SpanConsumer`: which ingest spans
- `Call<Void> accept(List<Span> spans)`
* `QueryRequest`: which includes
- `String serviceName, spanName;`
- `Map<String, String> annotationQuery;`
- `Long minDuration, maxDuration;`
- `long endTs, lookback;`
- `int limit;`
- Span Consumer: repartition of collected span batches into individual spans keyed by `traceId`
- Span Aggregation: stream processing of spans into aggregated traces and then into dependency links.
- Span Store: building local state stores to support search and query API.

### Kafka Zipkin Storage
And it is supported by 3 main Kafka topics:

#### `KafkaSpanStore`
- `zipkin-spans`: Topic where list of spans indexed by trace Id are stored.
- `zipkin-trace`: Topic where aggregated traces are stored.
- `zipkin-dependency`: Topic where dependency links per trace are stored.

Span Store is expecting Spans to be stored in topics partitioned by `TraceId`.
### Kafka Span Consumer

> These can be created by Span Consumer, or can be **enriched** by other Stream Processors, outside of
Zipkin Server.
This component processes collected span batches (via HTTP, Kafka, ActiveMQ, etc),
take each element and re-indexed them by `traceId` on the "spans" topic.

Kafka Span Store will need to support different kind of queries:
This component is currently compensating how `KafkaSender` (part of [Zipkin-Reporter](https://github.com/openzipkin/zipkin-reporter-java))
is reporting spans to Kafka, by grouping spans into batches and sending them to a un-keyed
Kafka topic.

Component source code: [KafkaSpanConsumer.java](storage/src/main/java/zipkin2/storage/kafka/KafkaSpanConsumer.java)

##### Get Service Names/Get Span Names
### Stream Processing

Service name to Span names pairs are indexed by aggregating spans.
#### Span Aggregation

##### Get Trace/Find Traces
"Partitioned" Spans are processed to produced two aggregated streams: `Traces` and `Dependencies`.

When search requests are received, span index is used to search for trace ids. After a list is
retrieved, trace DAG is retrieved from trace state store.
**Traces**:

##### Get Dependencies
Spans are grouped by ID and stored on a local
[Session window](https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/SessionWindows.html),
where the `traceId` becomes the token, and `trace-timeout` (default: 1 minute)
(i.e. period of time without receiving a span with the same session; also known as session inactivity gap
in Kafka Streams)
defines if a trace is still active or not. This is evaluated on the next span received on the stream--
regardless of incoming `traceId`. If session window is closed, a trace message is emitted to the
traces topic.

After `spans` are aggregated into traces, traces are processed to collect dependencies.
Dependencies changelog are stored in a Kafka topic to be be stored as materialized view on
Zipkin instances.
![Session Windows](https://kafka.apache.org/20/images/streams-session-windows-02.png)

### Stream processors
> Each color represents a trace. The longer `trace timeout` we have, the longer we wait
to close a window and the longer we wait to emit traces downstream for dependency link and additional
aggregations; but also the more consistent the trace aggregation is.
If we choose a smaller gap, then we emit traces faster with the risk of breaking traces into
smaller chunks, and potentially affecting counters downstream.

#### Trace Aggregation Stream Processor
**Dependencies**

This is the main processors that take incoming spans and aggregate them into:
Once `traces` are emitted downstream as part of the initial processing, dependency links are evaluated
on each trace, and emitted the dependencies topic for further metric aggregation.

- Traces
- Dependencies
Kafka Streams topology:

![service aggregation](docs/service-aggregation-stream.png)
![trace aggregation](docs/trace-aggregation-topology.png)

![dependency aggregation](docs/dependency-aggregation-stream.png)
#### Trace Store Stream

#### Store Stream Processors
This component build local stores from state received on `spans` Kafka topic
for traces, service names and autocomplete tags.

Global tables for traces, service names and dependencies to be available on local state.
Kafka Streams source code: [TraceStoreTopologySupplier](storage/src/main/java/zipkin2/storage/kafka/streams/TraceStoreTopologySupplier.java)

![trace store](docs/trace-store-stream.png)
Kafka Streams topology:

![service store](docs/service-store-stream.png)
![trace store](docs/trace-store-topology.png)

![dependency store](docs/dependency-store-stream.png)
#### Dependency Store

#### Index Stream Processor
This component build local store from state received on `dependency` Kafka topic.

Custom processor to full-text indexing of traces using Lucene as back-end.
It builds a 1 minute time-window when counts calls and errors.

![span index](docs/span-index-stream.png)
Kafka Streams source code: [DependencyStoreTopologySupplier](storage/src/main/java/zipkin2/storage/kafka/streams/DependencyStoreTopologySupplier.java)

#### Retention Stream Processor
Kafka Streams topology:

This is the processor that keeps track of trace timestamps for cleanup.
![dependency store](docs/dependency-store-topology.png)

![trace retention](docs/trace-retention-stream.png)
### Kafka Span Store

This component supports search and query APIs on top of local state stores build by the Store
Kafka Streams component.

Component source code: [KafkaSpanStore.java](storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java)

#### Get Service Names/Get Span Names/Get Remote Service Names

These queries are supported by service names indexed stores built from `spans` Kafka topic.

Store names:

- `zipkin-service-names`: key/value store with service name as key and value.
- `zipkin-span-names`: key/value store with service name as key and span names list as value.
- `zipkin-remote-service-names`: key/value store with service name as key and remote service names as value.

#### Get Trace/Find Traces

These queries are supported by two key value stores:

- `zipkin-traces`: indexed by `traceId`, contains span list status received from `spans` Kafka topic.
- `zipkin-traces-by-timestamp`: list of trace IDs indexed by `timestamp`.

`GetTrace` query is supported by `zipkin-traces` store.
`FindTraces` query is supported by both: When receiving a query request time range is used to get
trace IDs, and then query request is tested on each trace to build a response.

#### Get Dependencies

This query is supported 1-minute windowed store from `DependencyStoreStream`.

When a request is received, time range is used to pick valid windows and join counters.

Windowed store:

- `zipkin-dependencies`.

### Kafka Autocomplete Tags

#### Get Keys/Get Values

Supported by a key-value containing list of values valid for `autocompleteKeys`.

- `zipkin-autocomplete-tags`: key-value store.
53 changes: 34 additions & 19 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,49 @@
# the License.
#

FROM openjdk:8
FROM alpine

ARG KAFKA_STORAGE_VERSION=0.1.1
ENV ZIPKIN_REPO https://repo1.maven.org/maven2
ENV ZIPKIN_VERSION 2.16.1
ENV KAFKA_STORAGE_VERSION 0.4.1-SNAPSHOT

ENV ZIPKIN_REPO https://jcenter.bintray.com
ENV ZIPKIN_VERSION 2.12.6
ENV ZIPKIN_LOGGING_LEVEL INFO
WORKDIR /zipkin

RUN apk add unzip curl --no-cache && \
curl -SL $ZIPKIN_REPO/io/zipkin/zipkin-server/$ZIPKIN_VERSION/zipkin-server-$ZIPKIN_VERSION-exec.jar > zipkin-server.jar && \
# don't break when unzip finds an extra header https://github.com/openzipkin/zipkin/issues/1932
unzip zipkin-server.jar ; \
rm zipkin-server.jar

COPY autoconfigure/target/zipkin-autoconfigure-storage-kafka-${KAFKA_STORAGE_VERSION}-module.jar BOOT-INF/lib/kafka-module.jar
RUN unzip -o BOOT-INF/lib/kafka-module.jar lib/* -d BOOT-INF

FROM gcr.io/distroless/java:11-debug

# Use to set heap, trust store or other system properties.
ENV JAVA_OPTS -Djava.security.egd=file:/dev/./urandom

RUN ["/busybox/sh", "-c", "adduser -g '' -D zipkin"]

# Add environment settings for supported storage types
ENV STORAGE_TYPE kafka

COPY --from=0 /zipkin/ /zipkin/
WORKDIR /zipkin

RUN curl -SL $ZIPKIN_REPO/io/zipkin/java/zipkin-server/$ZIPKIN_VERSION/zipkin-server-${ZIPKIN_VERSION}-exec.jar > zipkin.jar
# TODO haven't found a better way to mount libs from custom storage. issue #28
#COPY autoconfigure/target/zipkin-autoconfigure-storage-kafka-${KAFKA_STORAGE_VERSION}-module.jar kafka-module.jar
#ENV MODULE_OPTS -Dloader.path='BOOT-INF/lib/kafka-module.jar,BOOT-INF/lib/kafka-module.jar!/lib' -Dspring.profiles.active=kafka
ENV MODULE_OPTS -Dspring.profiles.active=kafka

RUN ["/busybox/sh", "-c", "ln -s /busybox/* /bin"]

ADD storage/target/zipkin-storage-kafka-${KAFKA_STORAGE_VERSION}.jar zipkin-storage-kafka.jar
ADD autoconfigure/target/zipkin-autoconfigure-storage-kafka-${KAFKA_STORAGE_VERSION}-module.jar zipkin-autoconfigure-storage-kafka.jar
ENV KAFKA_STORAGE_DIR /data
RUN mkdir /data && chown zipkin /data
VOLUME /data

ENV STORAGE_TYPE=kafkastore
USER zipkin

EXPOSE 9410 9411
EXPOSE 9411

CMD exec java \
${JAVA_OPTS} \
-Dloader.path='zipkin-storage-kafka.jar,zipkin-autoconfigure-storage-kafka.jar' \
-Dspring.profiles.active=kafkastore \
-Dcom.linecorp.armeria.annotatedServiceExceptionVerbosity=all \
-Dcom.linecorp.armeria.verboseExceptions=true \
-cp zipkin.jar \
org.springframework.boot.loader.PropertiesLauncher \
--logging.level.zipkin2=${ZIPKIN_LOGGING_LEVEL}
ENTRYPOINT ["/busybox/sh", "-c", "exec java ${MODULE_OPTS} ${JAVA_OPTS} -cp . org.springframework.boot.loader.PropertiesLauncher"]
43 changes: 33 additions & 10 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,33 @@ all: build

OPEN := 'xdg-open'
MAVEN := './mvnw'
VERSION := '0.3.3-SNAPSHOT'
VERSION := '0.4.1-SNAPSHOT'
IMAGE_NAME := 'jeqo/zipkin-kafka'

.PHONY: run
run: build zipkin-local

.PHONY: run-docker
run-docker: build docker-build docker-up

.PHONY: kafka-topics
kafka-topics:
docker-compose exec kafka-zookeeper /busybox/sh /kafka/bin/kafka-run-class.sh kafka.admin.TopicCommand \
--zookeeper localhost:2181 --create --topic zipkin-spans --partitions 1 --replication-factor 1 --if-not-exists
docker-compose exec kafka-zookeeper /busybox/sh /kafka/bin/kafka-run-class.sh kafka.admin.TopicCommand \
--zookeeper localhost:2181 --create --topic zipkin-trace --partitions 1 --replication-factor 1 --if-not-exists
docker-compose exec kafka-zookeeper /busybox/sh /kafka/bin/kafka-run-class.sh kafka.admin.TopicCommand \
--zookeeper localhost:2181 --create --topic zipkin-dependency --partitions 1 --replication-factor 1 --if-not-exists

.PHONY: docker-build
docker-build:
TAG=${VERSION} \
docker-compose build
docker build -t ${IMAGE_NAME}:latest .
docker build -t ${IMAGE_NAME}:${VERSION} .

.PHONY: docker-push
docker-push: docker-build
TAG=${VERSION} \
docker-compose push
docker push ${IMAGE_NAME}:latest
docker push ${IMAGE_NAME}:${VERSION}

.PHONY: docker-up
docker-up:
Expand All @@ -33,7 +43,7 @@ docker-down:

.PHONY: docker-kafka-up
docker-kafka-up:
docker-compose up -d kafka zookeeper
docker-compose up -d kafka-zookeeper

.PHONY: license-header
license-header:
Expand All @@ -49,22 +59,35 @@ test: build

.PHONY: zipkin-local
zipkin-local:
STORAGE_TYPE=kafkastore \
STORAGE_TYPE=kafka \
KAFKA_BOOTSTRAP_SERVERS=localhost:19092 \
java \
-Dloader.path='storage/target/zipkin-storage-kafka-${VERSION}.jar,autoconfigure/target/zipkin-autoconfigure-storage-kafka-${VERSION}-module.jar' \
-Dspring.profiles.active=kafkastore \
-Dloader.path='autoconfigure/target/zipkin-autoconfigure-storage-kafka-${VERSION}-module.jar,autoconfigure/target/zipkin-autoconfigure-storage-kafka-${VERSION}-module.jar!/lib' \
-Dspring.profiles.active=kafka \
-cp zipkin.jar \
org.springframework.boot.loader.PropertiesLauncher

.PHONY: get-zipkin
get-zipkin:
curl -sSL https://zipkin.io/quickstart.sh | bash -s

.PHONY: zipkin-test-multi
zipkin-test-multi:
curl -s https://raw.githubusercontent.com/openzipkin/zipkin/master/zipkin-lens/testdata/netflix.json | \
curl -X POST -s localhost:9411/api/v2/spans -H'Content-Type: application/json' -d @- ; \
${OPEN} 'http://localhost:9412/zipkin/?lookback=custom&startTs=1'
sleep 61
curl -s https://raw.githubusercontent.com/openzipkin/zipkin/master/zipkin-lens/testdata/messaging.json | \
curl -X POST -s localhost:9411/api/v2/spans -H'Content-Type: application/json' -d @- ; \

.PHONY: zipkin-test
zipkin-test:
curl -s https://raw.githubusercontent.com/openzipkin/zipkin/master/zipkin-ui/testdata/netflix.json | \
curl -s https://raw.githubusercontent.com/openzipkin/zipkin/master/zipkin-lens/testdata/netflix.json | \
curl -X POST -s localhost:9411/api/v2/spans -H'Content-Type: application/json' -d @- ; \
${OPEN} 'http://localhost:9411/zipkin/?lookback=custom&startTs=1'
sleep 61
curl -s https://raw.githubusercontent.com/openzipkin/zipkin/master/zipkin-lens/testdata/messaging.json | \
curl -X POST -s localhost:9411/api/v2/spans -H'Content-Type: application/json' -d @- ; \

.PHONY: release
release:
Expand Down
Loading

0 comments on commit 3e9458d

Please sign in to comment.