Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sequencer: Extract staging behavior from core #1019

Merged
merged 1 commit into from
Sep 24, 2024
Merged

Conversation

bobvawter
Copy link
Member

@bobvawter bobvawter commented Sep 20, 2024

This change allows the source of staging data provided to the Core sequencer to
be configured. The StagingCursor type is renamed to BatchCursor and a new
BatchReader interface defines a way to receive an asynchronous stream of cursors.

All existing code is updated to inject a Staging sequencer into the stack. This
sequencer provides an acceptor which writes incoming data to the staging tables
and injects the existing staging-reader implementation into Core.

The Core sequencer's acceptor is now a no-op. The workload checker is updated
to support providing data via BatchReader. This allows Core to be tested
without use of any staging tables.


This change is Reviewable

Copy link
Member

@BramGruneir BramGruneir left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewed 31 of 31 files at r1, all commit messages.
Reviewable status: all files reviewed, 4 unresolved discussions (waiting on @bobvawter, @Jeremyyang920, @sravotto, and @ZhouXing19)


internal/sequencer/core/core.go line 41 at r1 (raw file):

// The Core sequencer applies transactional data read from a
// [types.BatchReader] in a possibly-concurrent manner.

Can you expland on this comment about possible-concurrency?


internal/sequencer/core/core.go line 114 at r1 (raw file):

						_, _, _ = progress.Update(func(stat sequencer.Stat) (sequencer.Stat, error) {
							stat = stat.Copy()
							if len(group.Tables) == 0 {

Is there a test for this?


internal/sequencer/staging/acceptor.go line 58 at r1 (raw file):

	ctx context.Context, batch *types.TableBatch, opts *types.AcceptOptions,
) error {
	stager, err := a.stagers.Get(ctx, batch.Table)

Isn't this the same as the inner loop of Accept MultiBatch (with a change in varialbe for stage?
Why not have them both call the same function?


internal/sequencer/staging/staging.go line 68 at r1 (raw file):

	opts.Delegate = s.markers.MultiAcceptor(opts.Delegate)

	if err != nil {

Why is this after setting the delegate?

Copy link
Member Author

@bobvawter bobvawter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: all files reviewed, 4 unresolved discussions (waiting on @BramGruneir, @Jeremyyang920, @sravotto, and @ZhouXing19)


internal/sequencer/core/core.go line 41 at r1 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Can you expland on this comment about possible-concurrency?

Done.


internal/sequencer/core/core.go line 114 at r1 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Is there a test for this?

The zero-table Group is exercised in the next commit, which switches pglogical over to to using Core.


internal/sequencer/staging/acceptor.go line 58 at r1 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Isn't this the same as the inner loop of Accept MultiBatch (with a change in varialbe for stage?
Why not have them both call the same function?

It could be in go 1.23 with the iter package. For the moment, there are enough differences that trying to use a common function was more trouble that it's worth.


internal/sequencer/staging/staging.go line 68 at r1 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Why is this after setting the delegate?

Moved the call to Query() to a local variable declaration above the call to Copy().

This change allows the source of staging data provided to the Core sequencer to
be configured. The StagingCursor type is renamed to BatchCursor and a new
BatchReader interface defines a way to receive an asynchronous stream of cursors.

All existing code is updated to inject a Staging sequencer into the stack. This
sequencer provides an acceptor which writes incoming data to the staging tables
and injects the existing staging-reader implementation into Core.

The Core sequencer's acceptor is now a no-op. The workload checker is updated
to support providing data via BatchReader. This allows Core to be tested
without use of any staging tables.
@bobvawter bobvawter added this pull request to the merge queue Sep 24, 2024
Merged via the queue into master with commit 2edda54 Sep 24, 2024
51 of 52 checks passed
@bobvawter bobvawter deleted the bob_core_reader branch September 24, 2024 17:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants