Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[prism] Processing Time handling (TestStream, Timers, ProcessContinuations) #30083

Open
Tracked by #29650
lostluck opened this issue Jan 23, 2024 · 0 comments
Open
Tracked by #29650
Assignees
Labels

Comments

@lostluck
Copy link
Contributor

lostluck commented Jan 23, 2024

Sharded out this batch of ProcesssingTime features to complete simultaneously, once TestStream #29917 and to allow #29772 to be closed.

After TestStream is in, there's a very clear hook for synthetically advancing the Processing Time for a pipeline, we must now make use of it. In particular, for handling Processing Time Timers and executing ProcessContinuations. Processing Time Triggers will build upon the implementation we end up with from this work, and are out of scope for this issue.

For ProcessContinuations, ProcessingTime is a Runner Relative time. The user is scheduling a recommendation about when the runner should perform the next action WRT wall time. For ProcessingTime timers, an "absolute" time is being provided based on the SDK worker's time. This means clockskew is going to be introduced invariably.

In a production focused runner, the goal is of course to adhere to these targets as closely as possible for predictable execution.

However, prism isn't currently intending to be a production focused runner. It's intending to be a test runner, which means we can largely rely on a notion of synthetic time. There are 3 cases to consider:

  • When operating in a production mode.
  • When operating in a test mode.
  • When there's a TestStream in the pipeline.

In a production mode, this can default to using time.Now() for advancing the realtime clock. Nothing special in this case. An advantage of this is being able to use facilities from the time.Time package more easily in prism if we need to.

In a test mode, it's valuable for the behavior tests to execute fast, and faster than normal. This is essentially how Prism executes presently for ProcessContinuations. The residuals are scheduled to be reprocessed immeadiately like any other inputs, no waiting involved. Assuming we have a queue of ProcessingTime elements, we would want to maintain this behavior somehow.

When a TestStream is in the pipeline, Prism should use the synthetic clock in TestStream while there are events remaining in the queue. Once the final event has occured, the synthetic clock should fallback to the Test behavior above.


I will note the other place "time" is implicilty considered in Prism is with the Progress loop during bundle processing. Presently that loop uses a ticker and does ordinary splits and such. That progress loop will not be considered for using the synthetic clock since it's important for prism to behave normally WRT the SDK in most ways. The split algorithm shouldn't need to be adjusted when the synthetic clock is in use, but there may be something I'm missing.


For handling Relative cases (ProcessContinuations and Triggers), the handling is straightforward: When we receive them, we calculate their firing time based on the current clock (synthetic or real). The Element Manager then needs to introduce them when that clock reaches the next time.

Residual Elements do need to hold back the watermark as appropriate, but given such transforms should be setting an Estimated Output Watermark time, this should avoid issues with holding back processing time.

ProcessingTime Timers are a complication because they are set absolutely and not relatively. This means the SDK and user could be imposing difficult constraints on the execution of the pipeline, that may not be the most testable. While this can't support 100% of cases, most TestStream uses are for unit testing within a pipeline, which usually has the requirement of fast and correct execution. We can make a simplifying assumption that the user genuinely wants to test their code without the true constraints of "real time".

This implies that we need to re-write the received processing time timer to be in terms of the synthetic clock, instead of the real time clock. Since Prism is notionally a single machine local runner, there should not be a great deal of clock skew between SDK workers and the Prism core. We can assume then that skew is going to be small, if any.

Essentially, we take the received absolute timestamp, subtract the current walltime from it to get a relative duration, and then re-apply that duration to the synthetic clock to get the new expected processing time. Then we handle the progression as normal. This is essentially the same as normal relative, except we must calculate the relative duration first. This logic will apply in test mode executions, not production mode executions.

This approach should work in most cases, but may still lead to flaky behavior for non-conforming tests.

In test mode executions, we can continue to use the same notion of synthetic time, but use the "event" queue for the various ProcessingTime based events to define advancement, since we already have defined the ProcessingTime events to be against the synthetic clock. To a first approximation, it should happen similarly to when TestStream events proc: When runner processing is blocked with pending events, but nothing is in progress. This can happen "one event at a time" as well, and must be prior to the TestStream event handling, since those must happen when no further processing is possible.

@lostluck lostluck changed the title Advance Processing Time handling [prism] Processing Time handling (TestStream, Timers, ProcessContinuations) Jan 23, 2024
@lostluck lostluck self-assigned this Jan 23, 2024
@lostluck lostluck added the go label Feb 15, 2024
lostluck added a commit that referenced this issue Apr 22, 2024
* Stabilize additional teststream cases.

* Update sdks/go/test/integration/primitives/teststream_test.go

Co-authored-by: Ritesh Ghorse <riteshghorse@gmail.com>

* Update sdks/go/test/integration/primitives/teststream.go

Co-authored-by: Ritesh Ghorse <riteshghorse@gmail.com>

* Update sdks/go/test/integration/primitives/teststream_test.go

Co-authored-by: Ritesh Ghorse <riteshghorse@gmail.com>

---------

Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com>
Co-authored-by: Ritesh Ghorse <riteshghorse@gmail.com>
lostluck added a commit that referenced this issue Apr 26, 2024
)

* [prism] Factor out hold tracking to dedicated structures

* review comment-reorder move code out of ladder.

---------

Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com>
lostluck added a commit that referenced this issue May 29, 2024
* [prism] Add basic processing time queue.

* Initial residual handling refactor.

* Re-work teststream initilization. Remove pending element race.

* touch up

* rm merge duplicate

* Simplify watermark hold tracking.

* First successful run!

* Remove duplicated test run.

* Deduplicate processing time heap.

* rm debug text

* Remove some debug prints, cleanup.

* tiny todo cleanup

* ProcessingTime workming most of the time!

* Some cleanup

* try to get github suite to pass #1

* touch

* reduce counts a bit, filter tests some.

* Clean up unrelated state changes. Clean up comments somewhat.

* Filter out dataflow incompatible test.

* Refine processing time event comment.

* Remove test touch.

---------

Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant