You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The state API is required for supporting Stateful Transforms.
In particular, DoFns that use the State API are stateful transforms.
The simplest way to ensure consistency with Stateful transforms is to have a single bundle process a given key at a time. This requires some additional rework and awareness of keys in the ElementManager around bundle scheduling. Further, it requires ensuring "fusion breaks" are inserted prior to the stateful DoFn, to enable prism to assure correct execution when fusion is implemented. Without fusion, all DoFns already execute with a "checkpointed" state.
This also covers cleaning up hacks for current Side Input support, which leak memory. [DONE #29423].
The Runner provides a store for state, that is unique per key, per window, per transform. (State is garbage collected at the end of the window, just like side inputs, and no transform shares state with another.)
Processing and state remains correct by restricting execution of a single key to a single Bundle throughout the job, WRT the transform. That is, no other bundle processing that transform may execute with the same key.
The first portion is fairly simple, with the obvious things:
We add storage of the committed windows|transforms|key state to engine.stageState
The worker.State then handles the various transient handling of state: requests and responses to the SDK and so forth.
At the end of bundle processing, we can additionally persist the current state back to the element manager, via the PersistBundle method.
The 2nd portion is trickier: How do we ensure a given key is only processed by a single bundle at once, relatively efficiently?
The stageState should keep a set of keys currently unavailable for scheduling, because they are currently being executed by a worker.
In stageState's AddPending (usually from PersistBundle), in knowing that the stage is stateful, we could eagerly extract the bytes for an element's key, and associate pending data with that key. Optionally, we could sort the values by event time as well, for some sort of linear ordering guarantee, putting them in a per-key pending heap.
TANGENT: This behavior would also be useful for stages marked aggregate to "pre-bucket" elements as they come in, instead, improving aggregation performance somewhat.
In startBundle which initializes a bundle with elements to process, a stateful stage examines available elements, by keys and associates them all with a bundle to be processed, avoiding keys that are currently in progress.
Any keys that have elements selected should have the keys marked as unschedulable.
We should store a list of the keys handled by the bundle in the inprogress map, so that we can properly "free" the keys for new data scheduling after completion.
This is where we would do any "one key per bundle, or one key per element" type restrictions on processing, such as to improve result latency. However, the current goal is to improve compute efficiency, which means we'd prefer to batch things together.
Since Beam doesn't currently have a notion of "PreservesKeys", stateful DoFns must be handled independantly of other stateful DoFns. They might be able to be sibling fused, but they must not be producer/consumer fused. In practice, simply don't fuse them together. The reason being is that the key could be changed between the two DoFns, and that could violate the unique processing restriction.
I think the first task is to ensure the mutually exclusive key processing, then dealing with the actual state handling.
The text was updated successfully, but these errors were encountered:
Under #29650.
What needs to happen?
The state API is required for supporting Stateful Transforms.
In particular, DoFns that use the State API are stateful transforms.
The simplest way to ensure consistency with Stateful transforms is to have a single bundle process a given key at a time. This requires some additional rework and awareness of keys in the ElementManager around bundle scheduling. Further, it requires ensuring "fusion breaks" are inserted prior to the stateful DoFn, to enable prism to assure correct execution when fusion is implemented. Without fusion, all DoFns already execute with a "checkpointed" state.
This also covers cleaning up hacks for current Side Input support, which leak memory. [DONE #29423].
Blocker for #28187
State support is conceptually straight forward:
The first portion is fairly simple, with the obvious things:
PersistBundle
method.The 2nd portion is trickier: How do we ensure a given key is only processed by a single bundle at once, relatively efficiently?
Since Beam doesn't currently have a notion of "PreservesKeys", stateful DoFns must be handled independantly of other stateful DoFns. They might be able to be sibling fused, but they must not be producer/consumer fused. In practice, simply don't fuse them together. The reason being is that the key could be changed between the two DoFns, and that could violate the unique processing restriction.
I think the first task is to ensure the mutually exclusive key processing, then dealing with the actual state handling.
The text was updated successfully, but these errors were encountered: