Most datalog query runtimes implement a variant of semi-naive evaluation, where queries are iteratively refined until they reach a fixed point. Such designs can efficiently compute views over a fixed database, but are insufficient for incrementalizing evaluation over a database that changes over time. Since semi-naive evaluation of nonmonotonic queries requires re-evaluation on changes to the input, it can be inefficient for many applications, especially when real time operation is desired.
This document describes an alternative runtime based in the dataflow model. This represents programs as circuits (graphs) whose vertices and edges correspond to computation over streams of data. These circuits MAY be incrementalized to instead operate over deltas, with the results combined into a final materialized view. Converting from relation algebra to dataflow is a fully mechanical, static transformation.
DBSP and Differential Dataflow were not developed as part of the Rhizome DB project. The papers and documentation lacked some detail required to implement such a system. This document contains the basics that are required to put these ideas into practice.
The following describes the basic concepts and data types:
A set of triples, each associated with a weight and timestamp: {element, timestamp, weight}
.
Index | Name | Description |
---|---|---|
0 | Element | The actual data being processed |
1 | Timestamp | Information about processing order |
2 | Weight | How many dependencies (or deletions) its responsible for |
[
{element1, timestamp1, weight1},
{element2, timestamp2, weight2},
...
]
Implementations SHOULD support efficient sequential access of a
Some operations return
A set of key-value pairs, each associated with a weight and timestamp.
Indexed
[
{{key1, value1}, timestamp1, weight1},
{{key2, value2}, timestamp2, weight2},
...
]
Implementations SHOULD support efficient sequential access of an Indexed
- By key
- By value
- By timestamp.
Some operations return Indexed
A collection of deltas, combining multiple
Traces MAY be written as lists of 4-tuples:
[
{key1, value1, timestamp1, weight1},
{key2, value2, timestamp2, weight2},
...
]
Indexed
Implementations MUST support efficient sequential access of traces, first by key, and then by value, and lastly by timestamp.
Weights describe the number of downstream dependency count for a š¯•«-Set. It MUST be represented by an integer, and MAY be either positive or negative.
Positive weights indicate the number of derivations of that element within the
Dataflow engines require a concept of ordering. Each node in the circuit contains an incrementing integer "clock". In most (but not all) cases, this represents the number of times that the node has been touched. In general, each node has clock_start
and clock_stop
hooks that get called before and after evaluating that node.
Subcircuits are more complex: nodes are also notified when any of their parent circuits start or stop a clock cycle. Implementations generally parameterize those hooks over a "scope" that gives the layer of the circuit the clock event corresponds to. This is useful for things like cleaning up state at the end of an epoch.
Under this example, using product order,
Below is an example of a path through a root circuit:
[0, 1, 2, 3, ...]
Whereas a subcircuit under that root may progress through these:
[
(0, 0),
(0, 1),
(0, 2),
(1, 0),
(1, 1),
(1, 2),
...
]
A circuit is an embedding of a PomoRA query plan into a directed (potentially cyclic) graph whose vertices (nodes) represent computation over streams, and whose edges describe those streams.
Circuits MAY contain subcircuits, which MUST represent recursive subcomputations that evaluate to a fixed point by the end of every iteration. For each iteration, a circuit is run by evaluating its nodes (subcircuits and single nodes) in some topologically sorted order, until a fixed point is reached.
A stream is an infinite sequence of values. A node processing a stream progresses one tuple at a time, and increments its counter after every tuple. It is RECOMMENDED that streams not be reified directly, and they MAY instead be modeled as cells containing the element field in the stream at the current timestamp.
The edges between nodes in a circuit describe streams of values flowing from the output of one node to the input of another. It is RECOMMENDED that these edges be defined in terms of the IDs of the nodes they connect.
A node is a vertex of a circuit, and describes a computation over the circuit's streams.
Every node is associated with both a local ID and a global ID. Local IDs MUST be unique among all nodes belonging to the same circuit, and global IDs MUST be unique across all nodes.
It is RECOMMENDED that local IDs be represented by a node's index into its parents nodes, and that global IDs be represented as the path formed by the local IDs of a node's ancestors.
Nodes come in several types:
Type | Inputs | Outputs |
---|---|---|
Operator | N-ary | 1 |
Child | 0 | N-ary |
Feedback | 1 | 1 |
Import | 1 | 1 |
Sink | N-ary | 0 |
Source | 0 | 1 |
An operator node performs an operation on its input stream(s), outputting the result over a stream.
Operator nodes achieve a fixed point when their associated operator has done so.
A child node introduces a subcircuit that MUST used to perform recursive computations. Such circuits evaluate to a fixed point each iteration, using the same rules as the root circuit, then emit their result to downstream nodes.
Feedback nodes introduce a temporal edge between nodes in subsequent iterations of a circuit. Such nodes support persisting data between iterations of a circuit, and are also used to implement recursive circuits by propagating partial results forward in time until a fixed point is reached.
These nodes introduce apparent cycles into a circuit, however implementations SHOULD NOT treat them as such. Instead, their output stream SHOULD be lazily evaluated as part of the subsequent iteration.
Intuitively, these nodes can be thought of as delaying a stream by one iteration.
Import nodes attach a parent stream to a subcircuit.
Sink nodes perform an operation against its input streams. They MUST NOT produce circuit outputs.
Source nodes emit data over a stream. They MUST NOT accept any inputs from the circuit.
An operator specifies an operation against a stream, and is represented by a node. Operators MAY be stateful, and linear operators are those which can be computed using only the deltas at the current timestamp.
Name | Linearity | Input Types | Output Type |
---|---|---|---|
Aggregate | Varies | Indexed |
|
Consolidate | Linear | Trace |
|
Distinct | Non-Linear |
|
|
Filter | Linear |
|
|
Index With | Linear |
|
Indexed |
Inspect | Linear |
|
|
Map | Linear |
|
|
Negate | Linear |
|
|
Delay Trace | Linear | Trace | Trace |
Delay | Linear |
|
|
Distinct Trace | Non-Linear |
|
|
Join Stream | Linear | Indexed |
|
Join Trace | Bilinear | Indexed |
|
Minus | Linear |
|
|
Plus | Linear |
|
|
Trace Append | Non-Linear |
|
Trace |
Untimed Trace Append | Non-Linear |
|
Trace |
Distinct Incremental | Non-Linear |
|
|
The aggregate operator takes an Indexed Z-Set as input, and applies an aggregate function over it, to return a š¯•«-Set that summarizes the values under each key, and associating a weight of 1
to each element. The resulting
Implementations MAY support user defined aggregates, but MUST support the aggregate functions described in the specification for the query language.
If additional aggregates are supported, they MUST be pure functions. It is RECOMMENDED that implementations enforce this constraint.
For example:
aggregate(count, [
{{1, "foo"}, 0, 1},
{{1, "bar"}, 0, 1},
{{2, "baz"}, 0, 1}
]) => [
{{1, 2}, nil, 1},
{{2, 1}, nil, 1}
]
Consolidation operators merge all deltas in the input trace into a š¯•«-Set. The resulting
This operator is intended to combine deltas from across multiple timestamps into a single
For example:
consolidate([
{0, 0, 0, 1},
{0, 0, 0, -1},
{0, 1, 0, 1},
{0, 1, 0, 1}.
{1, 2, 0, 2},
{1, 3, 0, 1},
{1, 3, 0, -1},
{1, 4, 0, -1},
{2, 2, 0, 1},
{2, 4, 0 1}
]) => [
{{0, 1}, nil, 2},
{{1, 2}, nil, 2},
{{1, 4}, nil, -1},
{{2, 2}, nil, 1},
{{2, 4}, nil, 1}
]
Returns a 1
.
For example:
distinct([
{0, 0, 1},
{1, 0, 2},
{2, 0, -1},
{3, 0, 0}
]) => [
{0, 0, 1},
{1, 0, 1}
]
Filters a š¯•«-Set by a predicate. The predicate MUST be a pure function, returning a boolean.
is_positive = fn x -> x >= 1 end
filter(is_positive, [
{0, 0, 1},
{1, 0, 2},
{2, 1, -3}
]) => [
{1, 0, 2},
{2, 1, -3}
]
Indexing operators group elements of a š¯•«-Set according to some key function, returning an Indexed Z-Set.
For example:
key_function = fn {src, dst, cost} -> src end
index_with(key_function, [
{{0, 1, 1}, 0, 1},
{{1, 2, 1}, 1, 1},
{{1, 3, 2}, 1, -1}
]) => [
{{0, {0, 1, 1}}, 0, 1},
{{1, {1, 2, 1}}, 1, 1},
{{1, {1, 3, 2}}, 1, -1}
]
The inspect operator applies a callback to a š¯•«-Set, and returns the original
This operator is primarily intended as a debugging aid, and can be used to output the contents of streams at runtime.
For example:
inspect_fun = fn x -> IO.inspect(x) end
inspect(inspect_fun, [
{0, 1, 1},
{1, 1, 1}
]) => [
{0, 1, 1},
{1, 1, 1}
] # Also printing out [{0, 1, 1}, {1, 1, 1}]
The map operator transforms elements of a š¯•«-Set according to some function. The predicate MUST be a pure function.
For example:
is_positive = fn x -> x >= 1 end
filter(is_positive, [
{0, 0, 1},
{1, 0, 2},
{2, 1, -3}
]) => [
{1, 0, 2},
{2, 1, -3}
]
The negate operator flips the sign on the weight of each element in a š¯•«-Set.
For example:
negate([
{0, 0, 1},
{1, 0, -1},
{2, 0, -2}
]) => [
{0, 0, -1},
{1, 0, 1},
{2, 0, 2}
]
The trace operator returns the previous input trace.
The delay (
Distinct trace is a variant of Distinct that offers more performance for incremental computation and computes across multiple timestamps, with support for use in nested contexts, like recursive circuits. It computes the distinct elements of a š¯•«-Set in its first argument, with respect to a Trace in its second, returning them in a new
Note that because operator computes the delta of Distinct, it is possible for returned elements to have negative weights, if those elements are deleted between timestamps.
Distinct is not a linear operation, and requires access to the entire history of updates, however there's a few observations that can reduce the search space of timestamps to examine.
Consider evaluating the operator at some timestamp
Then there are two possible classes of elements which may be returned:
- Elements in the current input
$\Bbb{Z}$ -Set - Elements that were returned in response to a previous input, at timestamp
$\langle e, i_0 \rangle$ , such that$i_0 < i$ , and where the element was also returned at timestamp$\langle e_0, i \rangle$ , such that$e_0 < e$ .
For each element, with weight
-
$w_1$ is computed, as the sum of all weights in which that element appears at times$\langle e_0, i_0 \rangle$ for$e_0 < e$ and$i_0 < i$ -
$w_2$ is computed, as the sum of all weights in which that element appears at times$\langle e_0, i \rangle$ for$e_0 < e$ -
$w_3$ is computed, as the sum of all weights in which that element appears at times$\langle e, i_0 \rangle$ for$i_0 < i$ -
$d_0$ is computed, such that:-
$d_0 = 1$ , if$w_1 \le \land w_1 + w_2 > 0$ -
$d_0 = -1$ , if$w_1 > 0 \land w_1 + w_2 \le 0$ -
$d_0 = 0$ , otherwise
-
-
$d_1$ is computed, such that:-
$d_1 = 1$ , if$w_1 + w_3 \le 0 \land w_1 + w_2 + w_3 + w > 0$ -
$d_1 = -1$ , if$w_1 + w_3 > 0 \land w_1 + w_3 + w_4 \le 0$
-
- If
$d_1 - d_0 \ne 0$ , then the element is returned in the$\Bbb{Z}$ -Set, with weight$d_1 - d_0$
For example:
b00 = [
{0, {0, 0}, 1},
{2, {0, 0}, 1},
{3, {0, 0}, -1}
]
b01 = [
{5, {0, 1}, 1}
]
b10 = [
{5, {1, 0}, 1}
]
b11 = [
{0, {1, 1}, 1},
{1, {1, 1}, 1},
{2, {1, 1}, -1},
{3, {1, 1}, 1},
{4, {1, 1}, -1}
]
t00 = []
t01 = insert_trace(t00, b00)
t10 = insert_trace(t01, b01)
t11 = insert_trace(t10, b10)
# At time {0, 0}
distinct_trace(b00, t00) => [
{0, nil, 1},
{2, nil, 1}
]
# At time {0, 1}
distinct_trace(b01, t01) => [
{5, nil, 1}
]
# At time {1, 0}
distinct_trace(b10, t10) => [
{5, nil, 1}
]
# At time {1, 1}
distinct_trace(b11, t11) => [
{1, nil, 1},
{2, nil, -1},
{5, nil, -1}
]
A more complete Elixir example
# Sum of weights where epoch(t) < epoch(time) and iteration(t) < iteration(time)
w1 = 0
# Sum of weights where epoch(t) < epoch(time) and iteration(t) == iteration(time)
w2 = 0
# Sum of weights where epoch(t) == epoch(time) and iteration(t) < iteration(time)
w3 = 0
{trace_cursor, times} = ZSetTraceCursor.times(trace_cursor)
{w1, w2, w3, next_time} =
Enum.reduce(times, {w1, w2, w3, nil}, fn
{{t_epoch, t_iteration} = t, w}, {w1, w2, w3, next_time} ->
cond do
t_epoch == epoch ->
if Algebra.PartialOrder.less_than(t_iteration, iteration) do
{w1, w2, w3 + w, next_time}
else
{w1, w2, w3, next_time}
end
Algebra.PartialOrder.less_than(t_iteration, iteration) ->
{w1 + w, w2, w3, next_time}
t_iteration == iteration ->
{w1, w2 + w, w3, next_time}
is_nil(next_time) or Algebra.PartialOrder.less_than(t, next_time) ->
{w1, w2, w3, t}
true ->
{w1, w2, w3, next_time}
end
end)
w12 = w1 + w2
w13 = w1 + w3
w123 = w12 + w3
w1234 = w123 + weight
delta_old =
cond do
w1 <= 0 and w12 > 0 ->
1
w1 > 0 and w12 <= 0 ->
-1
true ->
0
end
delta_new =
cond do
w13 <= 0 and w1234 > 0 ->
1
w13 > 0 and w1234 <= 0 ->
-1
true ->
0
end
output =
if delta_old == delta_new do
output
else
w = delta_new - delta_old
tuple = {{key, {}}, nil, w}
[tuple | output]
end
The join stream operator MUST merge two [Indexed
For example:
join_fun = fn key, v1, v2 -> {key, {v1, v2}} end
a = [
{{:a, 1}, 0, 1},
{{:b, 2}, 0, 2},
{{:c, 2}, 0, 1}
]
b = [
{{:a, 1}, 0, 1},
{{:b, 3}, 0, 1},
{{:b, 4}, 0, -1}
]
join_stream(join_fun, a, b) => [
{{:a, {1, 1}}, nil, 1},
{{:b, {2, 3}}, nil, 2},
{{:b, {2, 4}}, nil, -2}
]
The join trace operator is a variant of an Indexed Z-Set join with a Trace. This takes advantage of the bilinearity of relational joins in order to support incremental joins across timestamps.
This operator MUST return a
Join trace behaves similarly to Join Stream. The first argument MUST represent deltas for the current timestamp. The second argument MUST be a trace containing all updates observed thus far. In this way, an incremental join MAY be implemented as follows:
join_fun = ...
join_fun_flipped =
fn k, v1, v2 ->
join_fun(k, v2, v1)
end
a = ... # some Z-Set
b = ... # some Z-Set
a_trace = z1_trace(a)
b_trace = z1_trace(b)
incremental_join(join_fun, a, b) =
join_stream(join_fun, a, b) +
join_trace(join_fun, a, b_trace) +
join_trace(join_fun_flipped, b, a_trace)
Where z1_trace(x)
denotes an application of the Delay Trace operator, join_fun_flipped
flips the value arguments of join_fun
, and +
denotes the Plus Operator.
For example:
join_fun = fn key, v1, v2 -> {key, {v1, v2}} end
zset = [
{{:a, 0}, 0, 1},
{{:a, 0}, 0, -1},
{{:a, 1}, 0, 1},
{{:b, 2}, 0, 2},
{{:c, 2}, 0, 1}
]
trace = [
{:a, 1, 0, 1},
{:b, -3, 0, -1},
{:b, 3, 0, 1},
{:b, 4, 0, -1},
{:c, 4, 0, 1}
]
join_trace(join_fun, zset, trace) => [
{{:a, 1, 1}, nil, 1},
{{:b, 2, -3}, nil, -2},
{{:b, 2, 3}, nil, 2},
{{:b, 2, 4}, nil, -2},
{{:c, 2, 4}, nil, 1}
]
The minus operator MUST subtract all weights for matching elements in two
For example:
a = [
{0, 0, 1},
{1, 0, 1},
{2, 0, 2},
{3, 0, 1}
]
b = [
{0, 0, 1},
{1, 0, -1},
{2, 0, 1}
]
minus(a, b) => [
{1, 0, 2},
{2, 0, 1},
{3, 0, 1}
]
Adds all weights for matching keys in two
For example:
a = [
{0, 0, 1},
{1, 0, 1},
{2, 0, 2},
{3, 0, 1}
]
b = [
{0, 0, 1},
{1, 0, -1},
{2, 0, 1}
]
plus(a, b) => [
{0, 0, 2},
{2, 0, 3},
{3, 0, 1}
]
Inserts the input
Inserts the input
A variant of Distinct that offers more performance for incremental computation and computes across multiple timestamps. It computes the distinct elements of a š¯•«-Set in its first argument, with respect to a Trace in its second, returning them in a new
Note that because operator computes the delta of Distinct, it is possible for returned elements to have negative weights, if those elements are deleted between timestamps.
This computation can be performed by returning the elements in the
For example:
batch = [
{0, nil, 2},
{2, nil, 1},
{3, nil, -1}
]
distinct(batch, [
{0, 0, 1},
]) => [
{2, nil, 1}
]
distinct(batch, [
{2, 1, 1},
{3, 1, 1}
]) => [
{0, nil, 1},
{3, nil, -1}
]
distinct(batch, [
{0, 2, -1},
]) => [
{0, nil, 1},
{2, nil, 1}
]