Skip to content

Commit

Permalink
docs: add metrics doc on barrier (risingwavelabs#8902)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Mar 31, 2023
1 parent 96d92ca commit 74b935e
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 1 deletion.
3 changes: 2 additions & 1 deletion docs/checkpoint.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ RisingWave makes checkpointing via [Chandy–Lamport algorithm](https://en.wikip

![](./images/checkpoint/checkpoint.svg)

To guarantee consistency, RisingWave introduces Chandy-Lamport algorithm as its checkpoint scheme. In particular, RisingWave periodically repeats the following procedure:
To guarantee consistency, RisingWave introduces Chandy-Lamport algorithm as its checkpoint scheme.
In particular, RisingWave periodically (every `barrier_interval_ms`) repeats the following procedure:

1. The meta service initializes a barrier and broadcasts it to all source actors across the streaming engine.
2. The barrier messages go through every streaming operator (actors) in the streaming graph.
Expand Down
51 changes: 51 additions & 0 deletions docs/metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Metrics

The contents of this document may be subject to frequent change.
It covers what each metric measures, and what information we may derive from it.

## Barrier Latency

Prerequisite: [Checkpoint](./checkpoint.md)

This metric measures the duration from which a barrier is injected into **all** sources in the stream graph,
to the barrier flown through all executors in the graph.

### What can we understand from it?

Usually when examining barrier latency, we look at **high barrier latency**.

There are two contributing factors to it:
1. Time taken to actually process the streaming messages.
2. Buffer capacity for streaming messages.

#### Processing costs

When injecting a new barrier,
there will usually be streaming messages in the stream graph (unless it's the initial barrier).
Since we keep total order for streaming messages,
this means that all streaming messages currently in the stream graph have to be processed
before the barrier can pass through.
If barrier latency is high, it could mean a long time is taken to process these streaming messages.
Concretely, here are some costs of processing streaming messages:
1. CPU cost of evaluating expressions.
2. I/O remote exchange between fragments.
3. Stateful Executor cache-miss (for instance hash-join and hash-agg). This results in extra costs to access state on s3.

#### Buffer capacities

Next, high barrier latency could also be caused by buffers in the graph.
If some downstream buffer is congested, we will be unable to queue and continue processing upstream messages.

For instance, if the channel in the exchange executor is full,
upstream messages cannot be sent through this channel.
This means the upstream executor will be unable to continue processing new stream messages, until some space on the buffer is freed.

The various buffer sizes can currently be adjusted via options in the developer configuration file.
For instance, options to configure buffer size of the exchange executor can be found [here](https://github.com/risingwavelabs/risingwave/blob/a36e01307d60491b91870ac5a37049a378fe986f/src/config/example.toml#L49-L50).

Another subtle cause is that large buffer size can also worsen barrier latency.
Suppose stream message processing is at its limit, and there's high latency as a result.
Typically, backpressure kicks in, the source is throttled.
If buffer sizes are too large, or if there are many buffers, there will not be backpressure applied to source immediately.
During this delay, we will continue to see high barrier latency.
A heuristic algorithm is on the way to deal with this: https://github.com/risingwavelabs/risingwave/issues/8654.

0 comments on commit 74b935e

Please sign in to comment.