Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement message queue #457

Merged
merged 34 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
d701fe2
feat: add Kafka and write orders to 'orders' Topic
secustor Oct 17, 2022
a975e1f
feat: add Accountingservice which consumes orders from Kafka
secustor Oct 17, 2022
927ed69
feat: add Frauddetectionservice which consumes orders from Kafka
secustor Oct 26, 2022
ed23f1a
Merge branch 'main' into implement_message_queue
secustor Oct 26, 2022
4bd0962
docs: add documentation for new services
secustor Oct 26, 2022
76b5bbc
docs: fix linting
secustor Oct 26, 2022
1fe2b69
feat: se-/deserialize using protobuf
secustor Oct 27, 2022
e289b03
docs: add service docs for accountingservice and frauddetectionservice
secustor Oct 27, 2022
61f1325
docs: fix linting
secustor Oct 27, 2022
4a82ed6
docs: add changelog
secustor Oct 27, 2022
c2aa434
docs: implement change requests
secustor Oct 31, 2022
cd89bda
refactor: use .env variable for broker ports
secustor Nov 2, 2022
2d98fe5
fix: add slf4j api to prevent warning during java agent initialization
secustor Nov 2, 2022
a65289e
Merge branch 'main' into implement_message_queue
julianocosta89 Nov 10, 2022
61555b2
feat: use KRAFT instead of Kafka with Zookeeper
secustor Nov 11, 2022
de48f63
Merge remote-tracking branch 'origin/implement_message_queue' into im…
secustor Nov 11, 2022
0a3c21a
Merge branch 'main' into implement_message_queue
secustor Nov 11, 2022
64f9089
fix(accountingservice): add protobuf-dev as as build dependency
secustor Nov 13, 2022
9df0b71
Merge branch 'main' into implement_message_queue
secustor Nov 15, 2022
b0281a2
docs: re-add services to architecture graph
secustor Nov 16, 2022
3e440b8
Merge remote-tracking branch 'origin/implement_message_queue' into im…
secustor Nov 16, 2022
3be3a31
Merge branch 'main' into implement_message_queue
secustor Nov 21, 2022
19c124e
Merge branch 'main' into implement_message_queue
secustor Nov 22, 2022
d9cec2b
docs(architecture_graph): rename accounting to accountingservice
secustor Nov 22, 2022
373b8c5
Merge branch 'main' into implement_message_queue
secustor Nov 28, 2022
1acc161
chore: remove licence reference from new services
secustor Nov 28, 2022
28e9637
feat(frauddetection): use eclipse-temurin 17 instead of openjdk 18
secustor Nov 28, 2022
157ded3
Merge branch 'main' into implement_message_queue
julianocosta89 Nov 29, 2022
6e5606e
Merge branch 'main' into implement_message_queue
austinlparker Dec 2, 2022
87c2554
Merge branch 'main' into implement_message_queue
secustor Dec 6, 2022
98ee2fb
minimize log
puckpuck Dec 7, 2022
e6aa7b6
update frauddetectionservice name
puckpuck Dec 7, 2022
e431bc1
add accounting service
puckpuck Dec 7, 2022
85f0e75
use kafka container name with healthchecks
puckpuck Dec 7, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,9 @@ GRAFANA_SERVICE_HOST=grafana
JAEGER_SERVICE_PORT=16686
JAEGER_SERVICE_HOST=jaeger

# Kafka
KAFKA_SERVICE_PORT=9092
KAFKA_SERVICE_ADDR=kafka:${KAFKA_SERVICE_PORT}
ZOOKEEPER_SERVICE_PORT=2181

ENV_PLATFORM=local
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,5 @@ significant modifications will be credited to OpenTelemetry Authors.
([#600](https://github.com/open-telemetry/opentelemetry-demo/pull/600))
* Add HTTP client instrumentation to shippingservice
([#610](https://github.com/open-telemetry/opentelemetry-demo/pull/610))
* Added Kafka, accountingservice and frauddetectionservice for async workflows
([#512](https://github.com/open-telemetry/opentelemetry-demo/pull/457))
117 changes: 106 additions & 11 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,34 @@ networks:
driver: bridge

services:
# Ad service
# Accounting service
accountingservice:
image: ${IMAGE_NAME}:${IMAGE_VERSION}-accountingservice
container_name: accounting-service
build:
context: ./
dockerfile: ./src/accountingservice/Dockerfile
cache_from:
- ${IMAGE_NAME}:${IMAGE_VERSION}-accountingservice
deploy:
resources:
limits:
memory: 300M
restart: always
environment:
- KAFKA_SERVICE_ADDR
- OTEL_EXPORTER_OTLP_TRACES_ENDPOINT
- OTEL_EXPORTER_OTLP_METRICS_ENDPOINT
- OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE
- OTEL_SERVICE_NAME=accountingservice
depends_on:
otelcol:
condition: service_started
kafka:
condition: service_healthy
logging: *logging

# AdService
adservice:
image: ${IMAGE_NAME}:${IMAGE_VERSION}-adservice
container_name: ad-service
Expand Down Expand Up @@ -102,18 +129,28 @@ services:
- PAYMENT_SERVICE_ADDR
- PRODUCT_CATALOG_SERVICE_ADDR
- SHIPPING_SERVICE_ADDR
- KAFKA_SERVICE_ADDR
- OTEL_EXPORTER_OTLP_TRACES_ENDPOINT
- OTEL_EXPORTER_OTLP_METRICS_ENDPOINT
- OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE
- OTEL_SERVICE_NAME=checkoutservice
depends_on:
- cartservice
- currencyservice
- emailservice
- paymentservice
- productcatalogservice
- shippingservice
- otelcol
cartservice:
condition: service_started
currencyservice:
condition: service_started
emailservice:
condition: service_started
paymentservice:
condition: service_started
productcatalogservice:
condition: service_started
shippingservice:
condition: service_started
otelcol:
condition: service_started
kafka:
condition: service_healthy
logging: *logging

# Currency service
Expand Down Expand Up @@ -196,6 +233,33 @@ services:
condition: service_healthy
logging: *logging

# Fraud Detection service
frauddetectionservice:
image: ${IMAGE_NAME}:${IMAGE_VERSION}-frauddetectionservice
container_name: frauddetection-service
build:
context: ./
dockerfile: ./src/frauddetectionservice/Dockerfile
cache_from:
- ${IMAGE_NAME}:${IMAGE_VERSION}-frauddetectionservice
deploy:
resources:
limits:
memory: 200M
restart: always
environment:
- KAFKA_SERVICE_ADDR
- OTEL_EXPORTER_OTLP_TRACES_ENDPOINT
- OTEL_EXPORTER_OTLP_METRICS_ENDPOINT
- OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE
- OTEL_SERVICE_NAME=frauddetectionservice
depends_on:
otelcol:
condition: service_started
kafka:
condition: service_healthy
logging: *logging

# Frontend
frontend:
image: ${IMAGE_NAME}:${IMAGE_VERSION}-frontend
Expand Down Expand Up @@ -240,7 +304,7 @@ services:
- shippingservice
logging: *logging

# Frontend
# Frontend Proxy (Envoy)
frontendproxy:
image: ${IMAGE_NAME}:${IMAGE_VERSION}-frontendproxy
build:
Expand Down Expand Up @@ -444,7 +508,7 @@ services:
deploy:
resources:
limits:
memory: 120M
memory: 200M
restart: always
environment:
- POSTGRES_USER=ffs
Expand All @@ -457,6 +521,38 @@ services:
timeout: 5s
retries: 5

# Kafka used by Checkout, Accounting, and Fraud Detection services
kafka:
image: confluentinc/cp-kafka:7.2.2-1-ubi8
hostname: kafka
container_name: kafka
environment:
KAFKA_LISTENERS: PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_PROCESS_ROLES: 'controller,broker'
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:9093'
KAFKA_METADATA_LOG_SEGMENT_MS: 15000
KAFKA_METADATA_MAX_RETENTION_MS: 1200000
KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS: 2800
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
volumes:
- ./src/kafka/update_run.sh:/tmp/update_run.sh
- ./src/kafka/clusterID:/tmp/clusterID
command: "bash -c 'ls -lh /tmp && /tmp/update_run.sh && /etc/confluent/docker/run'"
logging: *logging
healthcheck:
test: nc -z kafka 9092
start_period: 10s
interval: 5s
timeout: 10s
retries: 10

# Jaeger
jaeger:
image: jaegertracing/all-in-one
Expand Down Expand Up @@ -538,7 +634,6 @@ services:
- "${REDIS_PORT}"
logging: *logging


# Frontend Tests
frontendTests:
image: ${IMAGE_NAME}:${IMAGE_VERSION}-frontend-tests
Expand Down
25 changes: 13 additions & 12 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@ Want to deploy the demo and see it in action? Start here.
Want to understand how a particular language's instrumentation works? Start
here.

| Language | Auto Instrumentation | Manual Instrumentation |
|---------------|---------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------|
| .NET | [Cart Service](./services/cartservice.md) | [Cart Service](./services/cartservice.md) |
| C++ | | [Currency Service](./services/currencyservice.md) |
| Erlang/Elixir | [Feature Flag Service](./services/featureflagservice.md) | [Feature Flag Service](./services/featureflagservice.md) |
| Go | [Checkout Service](./services/checkoutservice.md), [Product Catalog Service]( ./services/productcatalogservice.md ) | [Checkout Service](./services/checkoutservice.md), [Product Catalog Service]( ./services/productcatalogservice.md ) |
| Java | [Ad Service](./services/adservice.md) | [Ad Service](./services/adservice.md) |
| JavaScript | [Frontend]( ./services/frontend.md ) | [Frontend](./services/frontend.md), [Payment Service](./services/paymentservice.md) |
| PHP | [Quote Service](./services/quoteservice.md) | [Quote Service](./services/quoteservice.md) |
| Python | [Recommendation Service](./services/recommendationservice.md) | [Recommendation Service](./services/recommendationservice.md) |
| Ruby | [Email Service](./services/emailservice.md) | [Email Service](./services/emailservice.md) |
| Rust | [Shipping Service](./services/shippingservice.md) | [Shipping Service](./services/shippingservice.md) |
| Language | Auto Instrumentation | Manual Instrumentation |
|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------|
| .NET | [Cart Service](./services/cartservice.md) | [Cart Service](./services/cartservice.md) |
| C++ | | [Currency Service](./services/currencyservice.md) |
| Erlang/Elixir | [Feature Flag Service](./services/featureflagservice.md) | [Feature Flag Service](./services/featureflagservice.md) |
| Go | [Accounting Service](./services/accountingservice.md), [Checkout Service](./services/checkoutservice.md), [Product Catalog Service]( ./services/productcatalogservice.md ) | [Checkout Service](./services/checkoutservice.md), [Product Catalog Service]( ./services/productcatalogservice.md ) |
| Java | [Ad Service](./services/adservice.md) | [Ad Service](./services/adservice.md) |
| JavaScript | [Frontend]( ./services/frontend.md ) | [Frontend](./services/frontend.md), [Payment Service](./services/paymentservice.md) |
| Kotlin | [Fraud Detection Service]( ./services/frauddetectionservice.md ) | |
| PHP | [Quote Service](./services/quoteservice.md) | [Quote Service](./services/quoteservice.md) |
| Python | [Recommendation Service](./services/recommendationservice.md) | [Recommendation Service](./services/recommendationservice.md) |
| Ruby | [Email Service](./services/emailservice.md) | [Email Service](./services/emailservice.md) |
| Rust | [Shipping Service](./services/shippingservice.md) | [Shipping Service](./services/shippingservice.md) |

### Service Documentation

Expand Down
11 changes: 11 additions & 0 deletions docs/current_architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ uses [Locust](https://locust.io/) to fake user traffic.
graph TD
subgraph Service Diagram
adservice(Ad Service):::java
accountingservice(Accounting Service):::golang
cache[(Cache<br/>&#40redis&#41)]
cartservice(Cart Service):::dotnet
checkoutservice(Checkout Service):::golang
currencyservice(Currency Service):::cpp
emailservice(Email Service):::ruby
frauddetectionservice(Fraud Detection Service):::kotlin
frontend(Frontend):::javascript
frontendproxy(Frontend Proxy <br/>&#40Envoy&#41):::cpp
loadgenerator([Load Generator]):::python
Expand All @@ -23,18 +25,22 @@ recommendationservice(Recommendation Service):::python
shippingservice(Shipping Service):::rust
featureflagservice(Feature Flag Service):::erlang
featureflagstore[(Feature Flag Store<br/>&#40PostgreSQL DB&#41)]
queue[(queue<br/>&#40Kafka&#41)]

Internet -->|HTTP| frontendproxy
frontendproxy -->|HTTP| frontend
frontendproxy -->|HTTP| featureflagservice
loadgenerator -->|HTTP| frontend

accountingservice -->|TCP| queue

secustor marked this conversation as resolved.
Show resolved Hide resolved
checkoutservice --->|gRPC| cartservice --> cache
checkoutservice --->|gRPC| productcatalogservice
checkoutservice --->|gRPC| currencyservice
checkoutservice --->|HTTP| emailservice
checkoutservice --->|gRPC| paymentservice
checkoutservice -->|gRPC| shippingservice
checkoutservice -->|TCP| queue

frontend -->|gRPC| adservice
frontend -->|gRPC| cartservice
Expand All @@ -44,6 +50,8 @@ frontend -->|gRPC| currencyservice
frontend -->|gRPC| recommendationservice -->|gRPC| productcatalogservice
frontend -->|gRPC| shippingservice -->|HTTP| quoteservice

frauddetectionservice -->|TCP| queue

productcatalogservice -->|gRPC| featureflagservice

shippingservice -->|gRPC| featureflagservice
Expand All @@ -53,6 +61,7 @@ featureflagservice --> featureflagstore
end

classDef java fill:#b07219,color:white;
classDef kotlin fill:#560ba1,color:white;
classDef dotnet fill:#178600,color:white;
classDef golang fill:#00add8,color:black;
classDef cpp fill:#f34b7d,color:white;
Expand All @@ -77,9 +86,11 @@ subgraph Service Legend
rustsvc(Rust):::rust
erlangsvc(Erlang/Elixir):::erlang
phpsvc(PHP):::php
kotlinsvc(Kotlin):::kotlin
end

classDef java fill:#b07219,color:white;
classDef kotlin fill:#560ba1,color:white;
classDef dotnet fill:#178600,color:white;
classDef golang fill:#00add8,color:black;
classDef cpp fill:#f34b7d,color:white;
Expand Down
2 changes: 1 addition & 1 deletion docs/docker_deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

- Docker
- [Docker Compose](https://docs.docker.com/compose/install/#install-compose) v2.0.0+
- 4 GB of RAM
- 5 GB of RAM

## Clone Repo

Expand Down
2 changes: 2 additions & 0 deletions docs/service_table.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ View [Service Graph](./current_architecture.md) to visualize request flows.

| Service | Language | Description |
|--------------------------------------------------------------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------|
| [accountingservice](./services/accountingservice.md) | Go | Processes incoming orders and count the sum of all orders (mock). |
| [adservice](./services/adservice.md) | Java | Provides text ads based on given context words. |
| [cartservice](./services/cartservice.md) | DotNet | Stores the items in the user's shopping cart in Redis and retrieves it. |
| [checkoutservice](./services/checkoutservice.md) | Go | Retrieves user cart, prepares order and orchestrates the payment, shipping and the email notification. |
| [currencyservice](./services/currencyservice.md) | C++ | Converts one money amount to another currency. Uses real values fetched from European Central Bank. It's the highest QPS service. |
| [emailservice](./services/emailservice.md) | Ruby | Sends users an order confirmation email (mock). |
| [frauddetectionservice](./services/frauddetectionservice.md) | Kotlin | Analyzes incoming orders and detects fraud attempts (mock). |
| [featureflagservice](./services/featureflagservice.md) | Erlang/Elixir | CRUD feature flag service to demonstrate various scenarios like fault injection & how to emit telemetry from a feature flag reliant service. |
secustor marked this conversation as resolved.
Show resolved Hide resolved
| [frontend](./services/frontend.md) | JavaScript | Exposes an HTTP server to serve the website. Does not require signup/login and generates session IDs for all users automatically. |
| [loadgenerator](./services/loadgenerator.md) | Python/Locust | Continuously sends requests imitating realistic user shopping flows to the frontend. |
Expand Down
58 changes: 58 additions & 0 deletions docs/services/accountingservice.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Accounting Service

This service calculates the total amount of sold products.
secustor marked this conversation as resolved.
Show resolved Hide resolved
This is only mocked and received orders are printed out.

[Accounting Service](../../src/accountingservice/)

## Traces

### Initializing Tracing

The OpenTelemetry SDK is initialized from `main` using the `initTracerProvider`
function.

```go
func initTracerProvider() (*sdktrace.TracerProvider, error) {
ctx := context.Background()

exporter, err := otlptracegrpc.New(ctx)
if err != nil {
return nil, err
}
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
)
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return tp, nil
}
```

You should call `TracerProvider.Shutdown()` when your service is shutdown to
ensure all spans are exported. This service makes that call as part of a
deferred function in main

```go
tp, err := initTracerProvider()
if err != nil {
log.Fatal(err)
}
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Printf("Error shutting down tracer provider: %v", err)
}
}()
```

### Adding Kafka ( Sarama ) auto-instrumentation

This service will receive the processed results of the Checkout Service via a
Kafka topic.
To instrument the Kafka client the ConsumerHandler implemented by the developer
has to be wrapped.

```go
handler := groupHandler{} // implements sarama.ConsumerGroupHandler
wrappedHandler := otelsarama.WrapConsumerGroupHandler(&handler)
```
Loading